• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import logging
19
20from .colors import color
21from .att import ATT_CID, ATT_PDU
22from .smp import SMP_CID, SMP_Command
23from .core import name_or_number
24from .l2cap import (
25    L2CAP_PDU,
26    L2CAP_CONNECTION_REQUEST,
27    L2CAP_CONNECTION_RESPONSE,
28    L2CAP_SIGNALING_CID,
29    L2CAP_LE_SIGNALING_CID,
30    L2CAP_Control_Frame,
31    L2CAP_Connection_Response,
32)
33from .hci import (
34    HCI_EVENT_PACKET,
35    HCI_ACL_DATA_PACKET,
36    HCI_DISCONNECTION_COMPLETE_EVENT,
37    HCI_AclDataPacketAssembler,
38)
39from .rfcomm import RFCOMM_Frame, RFCOMM_PSM
40from .sdp import SDP_PDU, SDP_PSM
41from .avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM
42
43# -----------------------------------------------------------------------------
44# Logging
45# -----------------------------------------------------------------------------
46logger = logging.getLogger(__name__)
47
48
49# -----------------------------------------------------------------------------
50PSM_NAMES = {
51    RFCOMM_PSM: 'RFCOMM',
52    SDP_PSM: 'SDP',
53    AVDTP_PSM: 'AVDTP'
54    # TODO: add more PSM values
55}
56
57
58# -----------------------------------------------------------------------------
59class PacketTracer:
60    class AclStream:
61        def __init__(self, analyzer):
62            self.analyzer = analyzer
63            self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
64            self.avdtp_assemblers = {}  # AVDTP assemblers, by source_cid
65            self.psms = {}  # PSM, by source_cid
66            self.peer = None  # ACL stream in the other direction
67
68        # pylint: disable=too-many-nested-blocks
69        def on_acl_pdu(self, pdu):
70            l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
71
72            if l2cap_pdu.cid == ATT_CID:
73                att_pdu = ATT_PDU.from_bytes(l2cap_pdu.payload)
74                self.analyzer.emit(att_pdu)
75            elif l2cap_pdu.cid == SMP_CID:
76                smp_command = SMP_Command.from_bytes(l2cap_pdu.payload)
77                self.analyzer.emit(smp_command)
78            elif l2cap_pdu.cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
79                control_frame = L2CAP_Control_Frame.from_bytes(l2cap_pdu.payload)
80                self.analyzer.emit(control_frame)
81
82                # Check if this signals a new channel
83                if control_frame.code == L2CAP_CONNECTION_REQUEST:
84                    self.psms[control_frame.source_cid] = control_frame.psm
85                elif control_frame.code == L2CAP_CONNECTION_RESPONSE:
86                    if (
87                        control_frame.result
88                        == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
89                    ):
90                        if self.peer:
91                            if psm := self.peer.psms.get(control_frame.source_cid):
92                                # Found a pending connection
93                                self.psms[control_frame.destination_cid] = psm
94
95                                # For AVDTP connections, create a packet assembler for
96                                # each direction
97                                if psm == AVDTP_PSM:
98                                    self.avdtp_assemblers[
99                                        control_frame.source_cid
100                                    ] = AVDTP_MessageAssembler(self.on_avdtp_message)
101                                    self.peer.avdtp_assemblers[
102                                        control_frame.destination_cid
103                                    ] = AVDTP_MessageAssembler(
104                                        self.peer.on_avdtp_message
105                                    )
106
107            else:
108                # Try to find the PSM associated with this PDU
109                if self.peer and (psm := self.peer.psms.get(l2cap_pdu.cid)):
110                    if psm == SDP_PSM:
111                        sdp_pdu = SDP_PDU.from_bytes(l2cap_pdu.payload)
112                        self.analyzer.emit(sdp_pdu)
113                    elif psm == RFCOMM_PSM:
114                        rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
115                        self.analyzer.emit(rfcomm_frame)
116                    elif psm == AVDTP_PSM:
117                        self.analyzer.emit(
118                            f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
119                            f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
120                        )
121                        assembler = self.avdtp_assemblers.get(l2cap_pdu.cid)
122                        if assembler:
123                            assembler.on_pdu(l2cap_pdu.payload)
124                    else:
125                        psm_string = name_or_number(PSM_NAMES, psm)
126                        self.analyzer.emit(
127                            f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
128                            f'PSM={psm_string}]: {l2cap_pdu.payload.hex()}'
129                        )
130                else:
131                    self.analyzer.emit(l2cap_pdu)
132
133        def on_avdtp_message(self, transaction_label, message):
134            self.analyzer.emit(
135                f'{color("AVDTP", "green")} [{transaction_label}] {message}'
136            )
137
138        def feed_packet(self, packet):
139            self.packet_assembler.feed_packet(packet)
140
141    class Analyzer:
142        def __init__(self, label, emit_message):
143            self.label = label
144            self.emit_message = emit_message
145            self.acl_streams = {}  # ACL streams, by connection handle
146            self.peer = None  # Analyzer in the other direction
147
148        def start_acl_stream(self, connection_handle):
149            logger.info(
150                f'[{self.label}] +++ Creating ACL stream for connection '
151                f'0x{connection_handle:04X}'
152            )
153            stream = PacketTracer.AclStream(self)
154            self.acl_streams[connection_handle] = stream
155
156            # Associate with a peer stream if we can
157            if peer_stream := self.peer.acl_streams.get(connection_handle):
158                stream.peer = peer_stream
159                peer_stream.peer = stream
160
161            return stream
162
163        def end_acl_stream(self, connection_handle):
164            if connection_handle in self.acl_streams:
165                logger.info(
166                    f'[{self.label}] --- Removing ACL stream for connection '
167                    f'0x{connection_handle:04X}'
168                )
169                del self.acl_streams[connection_handle]
170
171                # Let the other forwarder know so it can cleanup its stream as well
172                self.peer.end_acl_stream(connection_handle)
173
174        def on_packet(self, packet):
175            self.emit(packet)
176
177            if packet.hci_packet_type == HCI_ACL_DATA_PACKET:
178                # Look for an existing stream for this handle, create one if it is the
179                # first ACL packet for that connection handle
180                if (stream := self.acl_streams.get(packet.connection_handle)) is None:
181                    stream = self.start_acl_stream(packet.connection_handle)
182                stream.feed_packet(packet)
183            elif packet.hci_packet_type == HCI_EVENT_PACKET:
184                if packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
185                    self.end_acl_stream(packet.connection_handle)
186
187        def emit(self, message):
188            self.emit_message(f'[{self.label}] {message}')
189
190    def trace(self, packet, direction=0):
191        if direction == 0:
192            self.host_to_controller_analyzer.on_packet(packet)
193        else:
194            self.controller_to_host_analyzer.on_packet(packet)
195
196    def __init__(
197        self,
198        host_to_controller_label=color('HOST->CONTROLLER', 'blue'),
199        controller_to_host_label=color('CONTROLLER->HOST', 'cyan'),
200        emit_message=logger.info,
201    ):
202        self.host_to_controller_analyzer = PacketTracer.Analyzer(
203            host_to_controller_label, emit_message
204        )
205        self.controller_to_host_analyzer = PacketTracer.Analyzer(
206            controller_to_host_label, emit_message
207        )
208        self.host_to_controller_analyzer.peer = self.controller_to_host_analyzer
209        self.controller_to_host_analyzer.peer = self.host_to_controller_analyzer
210