1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from contextlib import contextmanager 19from enum import IntEnum 20import logging 21import struct 22import datetime 23from typing import BinaryIO, Generator 24import os 25 26from bumble.hci import HCI_COMMAND_PACKET, HCI_EVENT_PACKET 27 28 29# ----------------------------------------------------------------------------- 30# Logging 31# ----------------------------------------------------------------------------- 32logger = logging.getLogger(__name__) 33 34 35# ----------------------------------------------------------------------------- 36# Classes 37# ----------------------------------------------------------------------------- 38class Snooper: 39 """ 40 Base class for snooper implementations. 41 42 A snooper is an object that will be provided with HCI packets as they are 43 exchanged between a host and a controller. 44 """ 45 46 class Direction(IntEnum): 47 HOST_TO_CONTROLLER = 0 48 CONTROLLER_TO_HOST = 1 49 50 class DataLinkType(IntEnum): 51 H1 = 1001 52 H4 = 1002 53 HCI_BSCP = 1003 54 H5 = 1004 55 56 def snoop(self, hci_packet: bytes, direction: Direction) -> None: 57 """Snoop on an HCI packet.""" 58 59 60# ----------------------------------------------------------------------------- 61class BtSnooper(Snooper): 62 """ 63 Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761. 64 """ 65 66 IDENTIFICATION_PATTERN = b'btsnoop\0' 67 TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1) 68 TIMESTAMP_DELTA = 0x00E03AB44A676000 69 ONE_MS = datetime.timedelta(microseconds=1) 70 71 def __init__(self, output: BinaryIO): 72 self.output = output 73 74 # Write the header 75 self.output.write( 76 self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4) 77 ) 78 79 def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None: 80 flags = int(direction) 81 packet_type = hci_packet[0] 82 if packet_type in (HCI_EVENT_PACKET, HCI_COMMAND_PACKET): 83 flags |= 0x10 84 85 # Compute the current timestamp 86 timestamp = ( 87 int((datetime.datetime.utcnow() - self.TIMESTAMP_ANCHOR) / self.ONE_MS) 88 + self.TIMESTAMP_DELTA 89 ) 90 91 # Emit the record 92 self.output.write( 93 struct.pack( 94 '>IIIIQ', 95 len(hci_packet), # Original Length 96 len(hci_packet), # Included Length 97 flags, # Packet Flags 98 0, # Cumulative Drops 99 timestamp, # Timestamp 100 ) 101 + hci_packet 102 ) 103 104 105# ----------------------------------------------------------------------------- 106_SNOOPER_INSTANCE_COUNT = 0 107 108 109@contextmanager 110def create_snooper(spec: str) -> Generator[Snooper, None, None]: 111 """ 112 Create a snooper given a specification string. 113 114 The general syntax for the specification string is: 115 <snooper-type>:<type-specific-arguments> 116 117 Supported snooper types are: 118 119 btsnoop 120 The syntax for the type-specific arguments for this type is: 121 <io-type>:<io-type-specific-arguments> 122 123 Supported I/O types are: 124 125 file 126 The type-specific arguments for this I/O type is a string that is converted 127 to a file path using the python `str.format()` string formatting. The log 128 records will be written to that file if it can be opened/created. 129 The keyword args that may be referenced by the string pattern are: 130 now: the value of `datetime.now()` 131 utcnow: the value of `datetime.utcnow()` 132 pid: the current process ID. 133 instance: the instance ID in the current process. 134 135 Examples: 136 btsnoop:file:my_btsnoop.log 137 btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log 138 139 """ 140 if ':' not in spec: 141 raise ValueError('snooper type prefix missing') 142 143 snooper_type, snooper_args = spec.split(':', maxsplit=1) 144 145 if snooper_type == 'btsnoop': 146 if ':' not in snooper_args: 147 raise ValueError('I/O type for btsnoop snooper type missing') 148 149 io_type, io_name = snooper_args.split(':', maxsplit=1) 150 if io_type == 'file': 151 # Process the file name string pattern. 152 global _SNOOPER_INSTANCE_COUNT 153 file_path = io_name.format( 154 now=datetime.datetime.now(), 155 utcnow=datetime.datetime.utcnow(), 156 pid=os.getpid(), 157 instance=_SNOOPER_INSTANCE_COUNT, 158 ) 159 160 # Open the file 161 logger.debug(f'Snoop file: {file_path}') 162 with open(file_path, 'wb') as snoop_file: 163 _SNOOPER_INSTANCE_COUNT += 1 164 yield BtSnooper(snoop_file) 165 _SNOOPER_INSTANCE_COUNT -= 1 166 return 167 168 raise ValueError(f'I/O type {io_type} not supported') 169 170 raise ValueError(f'snooper type {snooper_type} not found') 171