• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from contextlib import contextmanager
19from enum import IntEnum
20import logging
21import struct
22import datetime
23from typing import BinaryIO, Generator
24import os
25
26from bumble import core
27from bumble.hci import HCI_COMMAND_PACKET, HCI_EVENT_PACKET
28
29
30# -----------------------------------------------------------------------------
31# Logging
32# -----------------------------------------------------------------------------
33logger = logging.getLogger(__name__)
34
35
36# -----------------------------------------------------------------------------
37# Classes
38# -----------------------------------------------------------------------------
39class Snooper:
40    """
41    Base class for snooper implementations.
42
43    A snooper is an object that will be provided with HCI packets as they are
44    exchanged between a host and a controller.
45    """
46
47    class Direction(IntEnum):
48        HOST_TO_CONTROLLER = 0
49        CONTROLLER_TO_HOST = 1
50
51    class DataLinkType(IntEnum):
52        H1 = 1001
53        H4 = 1002
54        HCI_BSCP = 1003
55        H5 = 1004
56
57    def snoop(self, hci_packet: bytes, direction: Direction) -> None:
58        """Snoop on an HCI packet."""
59
60
61# -----------------------------------------------------------------------------
62class BtSnooper(Snooper):
63    """
64    Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761.
65    """
66
67    IDENTIFICATION_PATTERN = b'btsnoop\0'
68    TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1)
69    TIMESTAMP_DELTA = 0x00E03AB44A676000
70    ONE_MS = datetime.timedelta(microseconds=1)
71
72    def __init__(self, output: BinaryIO):
73        self.output = output
74
75        # Write the header
76        self.output.write(
77            self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4)
78        )
79
80    def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None:
81        flags = int(direction)
82        packet_type = hci_packet[0]
83        if packet_type in (HCI_EVENT_PACKET, HCI_COMMAND_PACKET):
84            flags |= 0x10
85
86        # Compute the current timestamp
87        timestamp = (
88            int((datetime.datetime.utcnow() - self.TIMESTAMP_ANCHOR) / self.ONE_MS)
89            + self.TIMESTAMP_DELTA
90        )
91
92        # Emit the record
93        self.output.write(
94            struct.pack(
95                '>IIIIQ',
96                len(hci_packet),  # Original Length
97                len(hci_packet),  # Included Length
98                flags,  # Packet Flags
99                0,  # Cumulative Drops
100                timestamp,  # Timestamp
101            )
102            + hci_packet
103        )
104
105
106# -----------------------------------------------------------------------------
107_SNOOPER_INSTANCE_COUNT = 0
108
109
110@contextmanager
111def create_snooper(spec: str) -> Generator[Snooper, None, None]:
112    """
113    Create a snooper given a specification string.
114
115    The general syntax for the specification string is:
116      <snooper-type>:<type-specific-arguments>
117
118    Supported snooper types are:
119
120      btsnoop
121        The syntax for the type-specific arguments for this type is:
122        <io-type>:<io-type-specific-arguments>
123
124        Supported I/O types are:
125
126        file
127          The type-specific arguments for this I/O type is a string that is converted
128          to a file path using the python `str.format()` string formatting. The log
129          records will be written to that file if it can be opened/created.
130          The keyword args that may be referenced by the string pattern are:
131            now: the value of `datetime.now()`
132            utcnow: the value of `datetime.utcnow()`
133            pid: the current process ID.
134            instance: the instance ID in the current process.
135
136    Examples:
137      btsnoop:file:my_btsnoop.log
138      btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
139
140    """
141    if ':' not in spec:
142        raise core.InvalidArgumentError('snooper type prefix missing')
143
144    snooper_type, snooper_args = spec.split(':', maxsplit=1)
145
146    if snooper_type == 'btsnoop':
147        if ':' not in snooper_args:
148            raise core.InvalidArgumentError('I/O type for btsnoop snooper type missing')
149
150        io_type, io_name = snooper_args.split(':', maxsplit=1)
151        if io_type == 'file':
152            # Process the file name string pattern.
153            global _SNOOPER_INSTANCE_COUNT
154            file_path = io_name.format(
155                now=datetime.datetime.now(),
156                utcnow=datetime.datetime.utcnow(),
157                pid=os.getpid(),
158                instance=_SNOOPER_INSTANCE_COUNT,
159            )
160
161            # Open the file
162            logger.debug(f'Snoop file: {file_path}')
163            with open(file_path, 'wb') as snoop_file:
164                _SNOOPER_INSTANCE_COUNT += 1
165                yield BtSnooper(snoop_file)
166                _SNOOPER_INSTANCE_COUNT -= 1
167                return
168
169        raise core.InvalidArgumentError(f'I/O type {io_type} not supported')
170
171    raise core.InvalidArgumentError(f'snooper type {snooper_type} not found')
172