1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import logging 19 20from .colors import color 21from .att import ATT_CID, ATT_PDU 22from .smp import SMP_CID, SMP_Command 23from .core import name_or_number 24from .l2cap import ( 25 L2CAP_PDU, 26 L2CAP_CONNECTION_REQUEST, 27 L2CAP_CONNECTION_RESPONSE, 28 L2CAP_SIGNALING_CID, 29 L2CAP_LE_SIGNALING_CID, 30 L2CAP_Control_Frame, 31 L2CAP_Connection_Response, 32) 33from .hci import ( 34 HCI_EVENT_PACKET, 35 HCI_ACL_DATA_PACKET, 36 HCI_DISCONNECTION_COMPLETE_EVENT, 37 HCI_AclDataPacketAssembler, 38) 39from .rfcomm import RFCOMM_Frame, RFCOMM_PSM 40from .sdp import SDP_PDU, SDP_PSM 41from .avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM 42 43# ----------------------------------------------------------------------------- 44# Logging 45# ----------------------------------------------------------------------------- 46logger = logging.getLogger(__name__) 47 48 49# ----------------------------------------------------------------------------- 50PSM_NAMES = { 51 RFCOMM_PSM: 'RFCOMM', 52 SDP_PSM: 'SDP', 53 AVDTP_PSM: 'AVDTP' 54 # TODO: add more PSM values 55} 56 57 58# ----------------------------------------------------------------------------- 59class PacketTracer: 60 class AclStream: 61 def __init__(self, analyzer): 62 self.analyzer = analyzer 63 self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) 64 self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid 65 self.psms = {} # PSM, by source_cid 66 self.peer = None # ACL stream in the other direction 67 68 # pylint: disable=too-many-nested-blocks 69 def on_acl_pdu(self, pdu): 70 l2cap_pdu = L2CAP_PDU.from_bytes(pdu) 71 72 if l2cap_pdu.cid == ATT_CID: 73 att_pdu = ATT_PDU.from_bytes(l2cap_pdu.payload) 74 self.analyzer.emit(att_pdu) 75 elif l2cap_pdu.cid == SMP_CID: 76 smp_command = SMP_Command.from_bytes(l2cap_pdu.payload) 77 self.analyzer.emit(smp_command) 78 elif l2cap_pdu.cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID): 79 control_frame = L2CAP_Control_Frame.from_bytes(l2cap_pdu.payload) 80 self.analyzer.emit(control_frame) 81 82 # Check if this signals a new channel 83 if control_frame.code == L2CAP_CONNECTION_REQUEST: 84 self.psms[control_frame.source_cid] = control_frame.psm 85 elif control_frame.code == L2CAP_CONNECTION_RESPONSE: 86 if ( 87 control_frame.result 88 == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL 89 ): 90 if self.peer: 91 if psm := self.peer.psms.get(control_frame.source_cid): 92 # Found a pending connection 93 self.psms[control_frame.destination_cid] = psm 94 95 # For AVDTP connections, create a packet assembler for 96 # each direction 97 if psm == AVDTP_PSM: 98 self.avdtp_assemblers[ 99 control_frame.source_cid 100 ] = AVDTP_MessageAssembler(self.on_avdtp_message) 101 self.peer.avdtp_assemblers[ 102 control_frame.destination_cid 103 ] = AVDTP_MessageAssembler( 104 self.peer.on_avdtp_message 105 ) 106 107 else: 108 # Try to find the PSM associated with this PDU 109 if self.peer and (psm := self.peer.psms.get(l2cap_pdu.cid)): 110 if psm == SDP_PSM: 111 sdp_pdu = SDP_PDU.from_bytes(l2cap_pdu.payload) 112 self.analyzer.emit(sdp_pdu) 113 elif psm == RFCOMM_PSM: 114 rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload) 115 self.analyzer.emit(rfcomm_frame) 116 elif psm == AVDTP_PSM: 117 self.analyzer.emit( 118 f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, ' 119 f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}' 120 ) 121 assembler = self.avdtp_assemblers.get(l2cap_pdu.cid) 122 if assembler: 123 assembler.on_pdu(l2cap_pdu.payload) 124 else: 125 psm_string = name_or_number(PSM_NAMES, psm) 126 self.analyzer.emit( 127 f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, ' 128 f'PSM={psm_string}]: {l2cap_pdu.payload.hex()}' 129 ) 130 else: 131 self.analyzer.emit(l2cap_pdu) 132 133 def on_avdtp_message(self, transaction_label, message): 134 self.analyzer.emit( 135 f'{color("AVDTP", "green")} [{transaction_label}] {message}' 136 ) 137 138 def feed_packet(self, packet): 139 self.packet_assembler.feed_packet(packet) 140 141 class Analyzer: 142 def __init__(self, label, emit_message): 143 self.label = label 144 self.emit_message = emit_message 145 self.acl_streams = {} # ACL streams, by connection handle 146 self.peer = None # Analyzer in the other direction 147 148 def start_acl_stream(self, connection_handle): 149 logger.info( 150 f'[{self.label}] +++ Creating ACL stream for connection ' 151 f'0x{connection_handle:04X}' 152 ) 153 stream = PacketTracer.AclStream(self) 154 self.acl_streams[connection_handle] = stream 155 156 # Associate with a peer stream if we can 157 if peer_stream := self.peer.acl_streams.get(connection_handle): 158 stream.peer = peer_stream 159 peer_stream.peer = stream 160 161 return stream 162 163 def end_acl_stream(self, connection_handle): 164 if connection_handle in self.acl_streams: 165 logger.info( 166 f'[{self.label}] --- Removing ACL stream for connection ' 167 f'0x{connection_handle:04X}' 168 ) 169 del self.acl_streams[connection_handle] 170 171 # Let the other forwarder know so it can cleanup its stream as well 172 self.peer.end_acl_stream(connection_handle) 173 174 def on_packet(self, packet): 175 self.emit(packet) 176 177 if packet.hci_packet_type == HCI_ACL_DATA_PACKET: 178 # Look for an existing stream for this handle, create one if it is the 179 # first ACL packet for that connection handle 180 if (stream := self.acl_streams.get(packet.connection_handle)) is None: 181 stream = self.start_acl_stream(packet.connection_handle) 182 stream.feed_packet(packet) 183 elif packet.hci_packet_type == HCI_EVENT_PACKET: 184 if packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT: 185 self.end_acl_stream(packet.connection_handle) 186 187 def emit(self, message): 188 self.emit_message(f'[{self.label}] {message}') 189 190 def trace(self, packet, direction=0): 191 if direction == 0: 192 self.host_to_controller_analyzer.on_packet(packet) 193 else: 194 self.controller_to_host_analyzer.on_packet(packet) 195 196 def __init__( 197 self, 198 host_to_controller_label=color('HOST->CONTROLLER', 'blue'), 199 controller_to_host_label=color('CONTROLLER->HOST', 'cyan'), 200 emit_message=logger.info, 201 ): 202 self.host_to_controller_analyzer = PacketTracer.Analyzer( 203 host_to_controller_label, emit_message 204 ) 205 self.controller_to_host_analyzer = PacketTracer.Analyzer( 206 controller_to_host_label, emit_message 207 ) 208 self.host_to_controller_analyzer.peer = self.controller_to_host_analyzer 209 self.controller_to_host_analyzer.peer = self.host_to_controller_analyzer 210