1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from contextlib import contextmanager 19from enum import IntEnum 20import logging 21import struct 22import datetime 23from typing import BinaryIO, Generator 24import os 25 26from bumble import core 27from bumble.hci import HCI_COMMAND_PACKET, HCI_EVENT_PACKET 28 29 30# ----------------------------------------------------------------------------- 31# Logging 32# ----------------------------------------------------------------------------- 33logger = logging.getLogger(__name__) 34 35 36# ----------------------------------------------------------------------------- 37# Classes 38# ----------------------------------------------------------------------------- 39class Snooper: 40 """ 41 Base class for snooper implementations. 42 43 A snooper is an object that will be provided with HCI packets as they are 44 exchanged between a host and a controller. 45 """ 46 47 class Direction(IntEnum): 48 HOST_TO_CONTROLLER = 0 49 CONTROLLER_TO_HOST = 1 50 51 class DataLinkType(IntEnum): 52 H1 = 1001 53 H4 = 1002 54 HCI_BSCP = 1003 55 H5 = 1004 56 57 def snoop(self, hci_packet: bytes, direction: Direction) -> None: 58 """Snoop on an HCI packet.""" 59 60 61# ----------------------------------------------------------------------------- 62class BtSnooper(Snooper): 63 """ 64 Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761. 65 """ 66 67 IDENTIFICATION_PATTERN = b'btsnoop\0' 68 TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1) 69 TIMESTAMP_DELTA = 0x00E03AB44A676000 70 ONE_MS = datetime.timedelta(microseconds=1) 71 72 def __init__(self, output: BinaryIO): 73 self.output = output 74 75 # Write the header 76 self.output.write( 77 self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4) 78 ) 79 80 def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None: 81 flags = int(direction) 82 packet_type = hci_packet[0] 83 if packet_type in (HCI_EVENT_PACKET, HCI_COMMAND_PACKET): 84 flags |= 0x10 85 86 # Compute the current timestamp 87 timestamp = ( 88 int((datetime.datetime.utcnow() - self.TIMESTAMP_ANCHOR) / self.ONE_MS) 89 + self.TIMESTAMP_DELTA 90 ) 91 92 # Emit the record 93 self.output.write( 94 struct.pack( 95 '>IIIIQ', 96 len(hci_packet), # Original Length 97 len(hci_packet), # Included Length 98 flags, # Packet Flags 99 0, # Cumulative Drops 100 timestamp, # Timestamp 101 ) 102 + hci_packet 103 ) 104 105 106# ----------------------------------------------------------------------------- 107_SNOOPER_INSTANCE_COUNT = 0 108 109 110@contextmanager 111def create_snooper(spec: str) -> Generator[Snooper, None, None]: 112 """ 113 Create a snooper given a specification string. 114 115 The general syntax for the specification string is: 116 <snooper-type>:<type-specific-arguments> 117 118 Supported snooper types are: 119 120 btsnoop 121 The syntax for the type-specific arguments for this type is: 122 <io-type>:<io-type-specific-arguments> 123 124 Supported I/O types are: 125 126 file 127 The type-specific arguments for this I/O type is a string that is converted 128 to a file path using the python `str.format()` string formatting. The log 129 records will be written to that file if it can be opened/created. 130 The keyword args that may be referenced by the string pattern are: 131 now: the value of `datetime.now()` 132 utcnow: the value of `datetime.utcnow()` 133 pid: the current process ID. 134 instance: the instance ID in the current process. 135 136 Examples: 137 btsnoop:file:my_btsnoop.log 138 btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log 139 140 """ 141 if ':' not in spec: 142 raise core.InvalidArgumentError('snooper type prefix missing') 143 144 snooper_type, snooper_args = spec.split(':', maxsplit=1) 145 146 if snooper_type == 'btsnoop': 147 if ':' not in snooper_args: 148 raise core.InvalidArgumentError('I/O type for btsnoop snooper type missing') 149 150 io_type, io_name = snooper_args.split(':', maxsplit=1) 151 if io_type == 'file': 152 # Process the file name string pattern. 153 global _SNOOPER_INSTANCE_COUNT 154 file_path = io_name.format( 155 now=datetime.datetime.now(), 156 utcnow=datetime.datetime.utcnow(), 157 pid=os.getpid(), 158 instance=_SNOOPER_INSTANCE_COUNT, 159 ) 160 161 # Open the file 162 logger.debug(f'Snoop file: {file_path}') 163 with open(file_path, 'wb') as snoop_file: 164 _SNOOPER_INSTANCE_COUNT += 1 165 yield BtSnooper(snoop_file) 166 _SNOOPER_INSTANCE_COUNT -= 1 167 return 168 169 raise core.InvalidArgumentError(f'I/O type {io_type} not supported') 170 171 raise core.InvalidArgumentError(f'snooper type {snooper_type} not found') 172