• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2021 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15#
16#
17# This script extracts LE Audio audio data from btsnoop.
18# Generates a audio dump file where each frame consists of a two-byte frame
19# length information and the coded frame
20#
21# Audio File Name Format:
22# [Context]_sf[Sample frequency]_fd[Frame duration]_[Channel allocation]_
23# frame[Octets per frame]_[Stream start timestamp]_[Direction].bin
24#
25#
26# Usage:
27# ./dump_le_audio.py BTSNOOP.cfa [-v] [--header] [--ase_handle ASE_HANDLE]
28#
29# -v, --verbose: to enable the verbose log
30# --header: Add the header for LC3 Conformance Interoperability Test Software V.1.0.3 from LC3 test specification.
31#  --ase_handle ASE_HANDLE: Set the ASE handle manually.
32#
33# NOTE:
34# Please make sure you HCI Snoop data file includes the following frames:
35# 1. GATT service discovery for "ASE Control Point" chracteristic (if you give the ase_handle via command, the flow could be skipped)
36# 2. GATT config codec via ASE Control Point
37# 3. HCI create CIS to point out the "Start stream", and the data frames.
38# After all hci packet parse finished, would dump all remain audio data as well
39#
40# Correspondsing Spec.
41# ASCS_1.0
42# PACS_1.0
43# BAP_1.0
44# LC3.TS V1.0.3
45#
46from collections import defaultdict
47from os import X_OK
48
49import argparse
50import struct
51import sys
52import time
53
54BTSNOOP_FILE_NAME = ""
55BTSNOOP_HEADER = b'btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea'
56
57COMMADN_PACKET = 1
58ACL_PACKET = 2
59SCO_PACKET = 3
60EVENT_PACKET = 4
61ISO_PACKET = 5
62
63SENT = 0
64RECEIVED = 1
65
66L2CAP_ATT_CID = 4
67
68# opcode for att protocol
69OPCODE_ATT_READ_BY_TYPE_RSP = 0x09
70OPCODE_ATT_WRITE_CMD = 0x52
71
72UUID_ASE_CONTROL_POINT = 0x2BC6
73
74# opcode for ase control
75OPCODE_CONFIG_CODEC = 0x01
76OPCODE_ENABLE = 0x03
77OPCODE_RELEASE = 0x08
78
79# opcode for hci command
80OPCODE_HCI_CREATE_CIS = 0x2064
81OPCODE_REMOVE_ISO_DATA_PATH = 0x206F
82OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA = 0x203F
83OPCODE_LE_CREATE_BIG = 0x2068
84OPCODE_LE_SETUP_ISO_DATA_PATH = 0x206E
85
86# HCI event
87EVENT_CODE_LE_META_EVENT = 0x3E
88SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE = 0x1B
89
90TYPE_STREAMING_AUDIO_CONTEXTS = 0x02
91
92TYPE_SAMPLING_FREQUENCIES = 0x01
93TYPE_FRAME_DURATION = 0x02
94TYPE_CHANNEL_ALLOCATION = 0x03
95TYPE_OCTETS_PER_FRAME = 0x04
96
97CONTEXT_TYPE_CONVERSATIONAL = 0x0002
98CONTEXT_TYPE_MEDIA = 0x0004
99CONTEXT_TYPE_RINGTONE = 0x0200
100
101# sample frequency
102SAMPLE_FREQUENCY_8000 = 0x01
103SAMPLE_FREQUENCY_11025 = 0x02
104SAMPLE_FREQUENCY_16000 = 0x03
105SAMPLE_FREQUENCY_22050 = 0x04
106SAMPLE_FREQUENCY_24000 = 0x05
107SAMPLE_FREQUENCY_32000 = 0x06
108SAMPLE_FREQUENCY_44100 = 0x07
109SAMPLE_FREQUENCY_48000 = 0x08
110SAMPLE_FREQUENCY_88200 = 0x09
111SAMPLE_FREQUENCY_96000 = 0x0a
112SAMPLE_FREQUENCY_176400 = 0x0b
113SAMPLE_FREQUENCY_192000 = 0x0c
114SAMPLE_FREQUENCY_384000 = 0x0d
115
116FRAME_DURATION_7_5 = 0x00
117FRAME_DURATION_10 = 0x01
118
119AUDIO_LOCATION_MONO = 0x00
120AUDIO_LOCATION_LEFT = 0x01
121AUDIO_LOCATION_RIGHT = 0x02
122AUDIO_LOCATION_CENTER = 0x04
123
124AD_TYPE_SERVICE_DATA_16_BIT = 0x16
125BASIC_AUDIO_ANNOUNCEMENT_SERVICE = 0x1851
126
127packet_number = 0
128debug_enable = False
129add_header = False
130ase_handle = 0xFFFF
131
132
133class Connection:
134
135    def __init__(self):
136        self.ase_handle = 0xFFFF
137        self.number_of_ases = 0
138        self.ase = defaultdict(AseStream)
139        self.context = 0xFFFF
140        self.cis_handle = 0xFFFF
141        self.input_dump = []
142        self.output_dump = []
143        self.start_time = 0xFFFFFFFF
144
145    def dump(self):
146        print("start_time: " + str(self.start_time))
147        print("ase_handle: " + str(self.ase_handle))
148        print("context type: " + str(self.context))
149        print("number_of_ases:  " + str(self.number_of_ases))
150        print("cis_handle:  " + str(self.cis_handle))
151        for id, ase_stream in self.ase.items():
152            print("ase id: " + str(id))
153            ase_stream.dump()
154
155
156class AseStream:
157
158    def __init__(self):
159        self.sampling_frequencies = 0xFF
160        self.frame_duration = 0xFF
161        self.channel_allocation = 0xFFFFFFFF
162        self.octets_per_frame = 0xFFFF
163
164    def dump(self):
165        print("sampling_frequencies: " + str(self.sampling_frequencies))
166        print("frame_duration: " + str(self.frame_duration))
167        print("channel_allocation: " + str(self.channel_allocation))
168        print("octets_per_frame: " + str(self.octets_per_frame))
169
170
171class Broadcast:
172
173    def __init__(self):
174        self.num_of_bis = defaultdict(int)  # subgroup - num_of_bis
175        self.bis = defaultdict(BisStream)  # bis_index - codec_config
176        self.bis_index_handle_map = defaultdict(int)  # bis_index - bis_handle
177        self.bis_index_list = []
178
179    def dump(self):
180        for bis_index, iso_stream in self.bis.items():
181            print("bis_index: " + str(bis_index) + " bis handle: " + str(self.bis_index_handle_map[bis_index]))
182            iso_stream.dump()
183
184
185class BisStream:
186
187    def __init__(self):
188        self.sampling_frequencies = 0xFF
189        self.frame_duration = 0xFF
190        self.channel_allocation = 0xFFFFFFFF
191        self.octets_per_frame = 0xFFFF
192        self.output_dump = []
193        self.start_time = 0xFFFFFFFF
194
195    def dump(self):
196        print("start_time: " + str(self.start_time))
197        print("sampling_frequencies: " + str(self.sampling_frequencies))
198        print("frame_duration: " + str(self.frame_duration))
199        print("channel_allocation: " + str(self.channel_allocation))
200        print("octets_per_frame: " + str(self.octets_per_frame))
201
202
203connection_map = defaultdict(Connection)
204cis_acl_map = defaultdict(int)
205broadcast_map = defaultdict(Broadcast)
206big_adv_map = defaultdict(int)
207bis_stream_map = defaultdict(BisStream)
208
209
210def generate_header(file, stream, is_cis):
211    sf_case = {
212        SAMPLE_FREQUENCY_8000: 80,
213        SAMPLE_FREQUENCY_11025: 110,
214        SAMPLE_FREQUENCY_16000: 160,
215        SAMPLE_FREQUENCY_22050: 220,
216        SAMPLE_FREQUENCY_24000: 240,
217        SAMPLE_FREQUENCY_32000: 320,
218        SAMPLE_FREQUENCY_44100: 441,
219        SAMPLE_FREQUENCY_48000: 480,
220        SAMPLE_FREQUENCY_88200: 882,
221        SAMPLE_FREQUENCY_96000: 960,
222        SAMPLE_FREQUENCY_176400: 1764,
223        SAMPLE_FREQUENCY_192000: 1920,
224        SAMPLE_FREQUENCY_384000: 2840,
225    }
226    fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10}
227    al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2}
228
229    header = bytearray.fromhex('1ccc1200')
230    if is_cis:
231        for ase in stream.ase.values():
232            header = header + struct.pack("<H", sf_case[ase.sampling_frequencies])
233            header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration]))
234            header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100,
235                                          0, 48000000)
236            break
237    else:
238        header = header + struct.pack("<H", sf_case[stream.sampling_frequencies])
239        header = header + struct.pack("<H", int(stream.octets_per_frame * 8 * 10 / fd_case[stream.frame_duration]))
240        header = header + struct.pack("<HHHL", al_case[stream.channel_allocation], fd_case[stream.frame_duration] * 100,
241                                      0, 48000000)
242    file.write(header)
243
244
245def parse_codec_information(connection_handle, ase_id, packet):
246    length, packet = unpack_data(packet, 1, False)
247    if len(packet) < length:
248        debug_print("Invalid codec configuration length")
249        return packet
250    ase = connection_map[connection_handle].ase[ase_id]
251    while length > 0:
252        config_length, packet = unpack_data(packet, 1, False)
253        config_type, packet = unpack_data(packet, 1, False)
254        value, packet = unpack_data(packet, config_length - 1, False)
255        if config_type == TYPE_SAMPLING_FREQUENCIES:
256            ase.sampling_frequencies = value
257        elif config_type == TYPE_FRAME_DURATION:
258            ase.frame_duration = value
259        elif config_type == TYPE_CHANNEL_ALLOCATION:
260            ase.channel_allocation = value
261        elif config_type == TYPE_OCTETS_PER_FRAME:
262            ase.octets_per_frame = value
263        length -= (config_length + 1)
264
265    return packet
266
267
268def parse_att_read_by_type_rsp(packet, connection_handle):
269    length, packet = unpack_data(packet, 1, False)
270    if length != 7:
271        #ignore the packet, we're only interested in this packet for the characteristic type UUID
272        return
273
274    if length > len(packet):
275        debug_print("Invalid att packet length")
276        return
277
278    attribute_handle, packet = unpack_data(packet, 2, False)
279    if debug_enable:
280        debug_print("attribute_handle - " + str(attribute_handle))
281    packet = unpack_data(packet, 1, True)
282    value_handle, packet = unpack_data(packet, 2, False)
283    characteristic_uuid, packet = unpack_data(packet, 2, False)
284    if characteristic_uuid == UUID_ASE_CONTROL_POINT:
285        debug_print("ASE Control point found!")
286        connection_map[connection_handle].ase_handle = value_handle
287
288
289def parse_att_write_cmd(packet, connection_handle, timestamp):
290    attribute_handle, packet = unpack_data(packet, 2, False)
291    global ase_handle
292    if ase_handle != 0xFFFF:
293        connection_map[connection_handle].ase_handle = ase_handle
294
295    if connection_map[connection_handle].ase_handle == attribute_handle:
296        if debug_enable:
297            debug_print("Action with ASE Control point")
298        opcode, packet = unpack_data(packet, 1, False)
299        if opcode == OPCODE_CONFIG_CODEC:
300            debug_print("config_codec")
301            (connection_map[connection_handle].number_of_ases, packet) = unpack_data(packet, 1, False)
302            for i in range(connection_map[connection_handle].number_of_ases):
303                ase_id, packet = unpack_data(packet, 1, False)
304                # ignore target_latency, target_phy, codec_id
305                packet = unpack_data(packet, 7, True)
306                packet = parse_codec_information(connection_handle, ase_id, packet)
307        elif opcode == OPCODE_ENABLE:
308            debug_print("enable")
309            numbers_of_ases, packet = unpack_data(packet, 1, False)
310            for i in range(numbers_of_ases):
311                ase_id, packet = unpack_data(packet, 1, False)
312                metadata_length, packet = unpack_data(packet, 1, False)
313                if metadata_length > len(packet):
314                    debug_print("Invalid metadata length")
315                    return
316                length, packet = unpack_data(packet, 1, False)
317                if length > len(packet):
318                    debug_print("Invalid metadata value length")
319                    return
320                metadata_type, packet = unpack_data(packet, 1, False)
321                if metadata_type == TYPE_STREAMING_AUDIO_CONTEXTS:
322                    (connection_map[connection_handle].context, packet) = unpack_data(packet, 2, False)
323                    break
324
325            connection_map[connection_handle].start_time = timestamp
326            if debug_enable:
327                connection_map[connection_handle].dump()
328
329
330def parse_att_packet(packet, connection_handle, flags, timestamp):
331    opcode, packet = unpack_data(packet, 1, False)
332    packet_handle = {
333        (OPCODE_ATT_READ_BY_TYPE_RSP, RECEIVED): (lambda x, y, z: parse_att_read_by_type_rsp(x, y)),
334        (OPCODE_ATT_WRITE_CMD, SENT): (lambda x, y, z: parse_att_write_cmd(x, y, z))
335    }
336    packet_handle.get((opcode, flags), lambda x, y, z: None)(packet, connection_handle, timestamp)
337
338
339def parse_big_codec_information(adv_handle, packet):
340    # Ignore presentation delay
341    packet = unpack_data(packet, 3, True)
342    number_of_subgroup, packet = unpack_data(packet, 1, False)
343    for subgroup in range(number_of_subgroup):
344        num_of_bis, packet = unpack_data(packet, 1, False)
345        broadcast_map[adv_handle].num_of_bis[subgroup] = num_of_bis
346        # Ignore codec id
347        packet = unpack_data(packet, 5, True)
348        length, packet = unpack_data(packet, 1, False)
349        if len(packet) < length:
350            print("Invalid subgroup codec information length")
351            return
352
353        while length > 0:
354            config_length, packet = unpack_data(packet, 1, False)
355            config_type, packet = unpack_data(packet, 1, False)
356            value, packet = unpack_data(packet, config_length - 1, False)
357            if config_type == TYPE_SAMPLING_FREQUENCIES:
358                sampling_frequencies = value
359            elif config_type == TYPE_FRAME_DURATION:
360                frame_duration = value
361            elif config_type == TYPE_OCTETS_PER_FRAME:
362                octets_per_frame = value
363            else:
364                print("Unknown config type")
365            length -= (config_length + 1)
366
367        # Ignore metadata
368        metadata_length, packet = unpack_data(packet, 1, False)
369        packet = unpack_data(packet, metadata_length, True)
370
371        for count in range(num_of_bis):
372            bis_index, packet = unpack_data(packet, 1, False)
373            broadcast_map[adv_handle].bis_index_list.append(bis_index)
374            length, packet = unpack_data(packet, 1, False)
375            if len(packet) < length:
376                print("Invalid level 3 codec information length")
377                return
378
379            while length > 0:
380                config_length, packet = unpack_data(packet, 1, False)
381                config_type, packet = unpack_data(packet, 1, False)
382                value, packet = unpack_data(packet, config_length - 1, False)
383                if config_type == TYPE_CHANNEL_ALLOCATION:
384                    channel_allocation = value
385                else:
386                    print("Ignored config type")
387                length -= (config_length + 1)
388
389            broadcast_map[adv_handle].bis[bis_index].sampling_frequencies = sampling_frequencies
390            broadcast_map[adv_handle].bis[bis_index].frame_duration = frame_duration
391            broadcast_map[adv_handle].bis[bis_index].octets_per_frame = octets_per_frame
392            broadcast_map[adv_handle].bis[bis_index].channel_allocation = channel_allocation
393
394    return packet
395
396
397def debug_print(log):
398    global packet_number
399    print("#" + str(packet_number) + ": " + log)
400
401
402def unpack_data(data, byte, ignore):
403    if ignore:
404        return data[byte:]
405
406    value = 0
407    if byte == 1:
408        value = struct.unpack("<B", data[:byte])[0]
409    elif byte == 2:
410        value = struct.unpack("<H", data[:byte])[0]
411    elif byte == 4:
412        value = struct.unpack("<I", data[:byte])[0]
413    return value, data[byte:]
414
415
416def parse_command_packet(packet, timestamp):
417    opcode, packet = unpack_data(packet, 2, False)
418    if opcode == OPCODE_HCI_CREATE_CIS:
419        debug_print("OPCODE_HCI_CREATE_CIS")
420
421        length, packet = unpack_data(packet, 1, False)
422        if length != len(packet):
423            debug_print("Invalid cmd length")
424            return
425        cis_count, packet = unpack_data(packet, 1, False)
426        for i in range(cis_count):
427            cis_handle, packet = unpack_data(packet, 2, False)
428            cis_handle &= 0x0EFF
429            acl_handle, packet = unpack_data(packet, 2, False)
430            connection_map[acl_handle].cis_handle = cis_handle
431            cis_acl_map[cis_handle] = acl_handle
432
433        if debug_enable:
434            connection_map[acl_handle].dump()
435    elif opcode == OPCODE_REMOVE_ISO_DATA_PATH:
436        debug_print("OPCODE_REMOVE_ISO_DATA_PATH")
437
438        length, packet = unpack_data(packet, 1, False)
439        if length != len(packet):
440            debug_print("Invalid cmd length")
441            return
442
443        iso_handle, packet = unpack_data(packet, 2, False)
444        # CIS stream
445        if iso_handle in cis_acl_map:
446            acl_handle = cis_acl_map[iso_handle]
447            dump_cis_audio_data_to_file(acl_handle)
448        # To Do: BIS stream
449        elif iso_handle in bis_stream_map:
450            dump_bis_audio_data_to_file(iso_handle)
451    elif opcode == OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA:
452        debug_print("OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA")
453
454        length, packet = unpack_data(packet, 1, False)
455        if length != len(packet):
456            debug_print("Invalid cmd length")
457            return
458
459        if length < 21:
460            debug_print("Ignored. Not basic audio announcement")
461            return
462
463        adv_hdl, packet = unpack_data(packet, 1, False)
464        #ignore operation, advertising_data_length
465        packet = unpack_data(packet, 2, True)
466        length, packet = unpack_data(packet, 1, False)
467        if length != len(packet):
468            debug_print("Invalid AD element length")
469            return
470
471        ad_type, packet = unpack_data(packet, 1, False)
472        service, packet = unpack_data(packet, 2, False)
473        if ad_type != AD_TYPE_SERVICE_DATA_16_BIT or service != BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
474            debug_print("Ignored. Not basic audio announcement")
475            return
476
477        packet = parse_big_codec_information(adv_hdl, packet)
478    elif opcode == OPCODE_LE_CREATE_BIG:
479        debug_print("OPCODE_LE_CREATE_BIG")
480
481        length, packet = unpack_data(packet, 1, False)
482        if length != len(packet) and length < 31:
483            debug_print("Invalid Create BIG command length")
484            return
485
486        big_handle, packet = unpack_data(packet, 1, False)
487        adv_handle, packet = unpack_data(packet, 1, False)
488        big_adv_map[big_handle] = adv_handle
489    elif opcode == OPCODE_LE_SETUP_ISO_DATA_PATH:
490        debug_print("OPCODE_LE_SETUP_ISO_DATA_PATH")
491        length, packet = unpack_data(packet, 1, False)
492        if len(packet) != length:
493            debug_print("Invalid LE SETUP ISO DATA PATH command length")
494            return
495
496        iso_handle, packet = unpack_data(packet, 2, False)
497        if iso_handle in bis_stream_map:
498            bis_stream_map[iso_handle].start_time = timestamp
499
500
501def parse_event_packet(packet):
502    event_code, packet = unpack_data(packet, 1, False)
503    if event_code != EVENT_CODE_LE_META_EVENT:
504        return
505
506    length, packet = unpack_data(packet, 1, False)
507    if len(packet) != length:
508        print("Invalid LE mata event length")
509        return
510
511    subevent_code, packet = unpack_data(packet, 1, False)
512    if subevent_code != SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE:
513        return
514
515    status, packet = unpack_data(packet, 1, False)
516    if status != 0x00:
517        debug_print("Create_BIG failed")
518        return
519
520    big_handle, packet = unpack_data(packet, 1, False)
521    if big_handle not in big_adv_map:
522        print("Invalid BIG handle")
523        return
524    adv_handle = big_adv_map[big_handle]
525    # Ignore, we don't care these parameter
526    packet = unpack_data(packet, 15, True)
527    num_of_bis, packet = unpack_data(packet, 1, False)
528    for count in range(num_of_bis):
529        bis_handle, packet = unpack_data(packet, 2, False)
530        bis_index = broadcast_map[adv_handle].bis_index_list[count]
531        broadcast_map[adv_handle].bis_index_handle_map[bis_index] = bis_handle
532        bis_stream_map[bis_handle] = broadcast_map[adv_handle].bis[bis_index]
533
534
535def convert_time_str(timestamp):
536    """This function converts time to string format."""
537    timestamp_sec = float(timestamp) / 1000000
538    local_timestamp = time.localtime(timestamp_sec)
539    ms = timestamp_sec - int(timestamp_sec)
540    ms_str = "{0:06}".format(int(round(ms * 1000000)))
541
542    str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
543    full_str_format = str_format + "_" + ms_str
544
545    return full_str_format
546
547
548def dump_cis_audio_data_to_file(acl_handle):
549    if debug_enable:
550        connection_map[acl_handle].dump()
551    file_name = ""
552    context_case = {
553        CONTEXT_TYPE_CONVERSATIONAL: "Conversational",
554        CONTEXT_TYPE_MEDIA: "Media",
555        CONTEXT_TYPE_RINGTONE: "Ringtone"
556    }
557    file_name += context_case.get(connection_map[acl_handle].context, "Unknown")
558    for ase in connection_map[acl_handle].ase.values():
559        sf_case = {
560            SAMPLE_FREQUENCY_8000: "8000",
561            SAMPLE_FREQUENCY_11025: "11025",
562            SAMPLE_FREQUENCY_16000: "16000",
563            SAMPLE_FREQUENCY_22050: "22050",
564            SAMPLE_FREQUENCY_24000: "24000",
565            SAMPLE_FREQUENCY_32000: "32000",
566            SAMPLE_FREQUENCY_44100: "44100",
567            SAMPLE_FREQUENCY_48000: "48000",
568            SAMPLE_FREQUENCY_88200: "88200",
569            SAMPLE_FREQUENCY_96000: "96000",
570            SAMPLE_FREQUENCY_176400: "176400",
571            SAMPLE_FREQUENCY_192000: "192000",
572            SAMPLE_FREQUENCY_384000: "284000"
573        }
574        file_name += ("_sf" + sf_case[ase.sampling_frequencies])
575        fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"}
576        file_name += ("_fd" + fd_case[ase.frame_duration])
577        al_case = {
578            AUDIO_LOCATION_MONO: "mono",
579            AUDIO_LOCATION_LEFT: "left",
580            AUDIO_LOCATION_RIGHT: "right",
581            AUDIO_LOCATION_CENTER: "center"
582        }
583        file_name += ("_" + al_case[ase.channel_allocation])
584        file_name += ("_frame" + str(ase.octets_per_frame))
585        file_name += ("_" + convert_time_str(connection_map[acl_handle].start_time))
586        break
587
588    if connection_map[acl_handle].input_dump != []:
589        debug_print("Dump unicast input...")
590        f = open(file_name + "_input.bin", 'wb')
591        if add_header == True:
592            generate_header(f, connection_map[acl_handle], True)
593        arr = bytearray(connection_map[acl_handle].input_dump)
594        f.write(arr)
595        f.close()
596        connection_map[acl_handle].input_dump = []
597
598    if connection_map[acl_handle].output_dump != []:
599        debug_print("Dump unicast output...")
600        f = open(file_name + "_output.bin", 'wb')
601        if add_header == True:
602            generate_header(f, connection_map[acl_handle], True)
603        arr = bytearray(connection_map[acl_handle].output_dump)
604        f.write(arr)
605        f.close()
606        connection_map[acl_handle].output_dump = []
607
608    return
609
610
611def dump_bis_audio_data_to_file(iso_handle):
612    if debug_enable:
613        bis_stream_map[iso_handle].dump()
614    file_name = "broadcast"
615    sf_case = {
616        SAMPLE_FREQUENCY_8000: "8000",
617        SAMPLE_FREQUENCY_11025: "11025",
618        SAMPLE_FREQUENCY_16000: "16000",
619        SAMPLE_FREQUENCY_22050: "22050",
620        SAMPLE_FREQUENCY_24000: "24000",
621        SAMPLE_FREQUENCY_32000: "32000",
622        SAMPLE_FREQUENCY_44100: "44100",
623        SAMPLE_FREQUENCY_48000: "48000",
624        SAMPLE_FREQUENCY_88200: "88200",
625        SAMPLE_FREQUENCY_96000: "96000",
626        SAMPLE_FREQUENCY_176400: "176400",
627        SAMPLE_FREQUENCY_192000: "192000",
628        SAMPLE_FREQUENCY_384000: "284000"
629    }
630    file_name += ("_sf" + sf_case[bis_stream_map[iso_handle].sampling_frequencies])
631    fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"}
632    file_name += ("_fd" + fd_case[bis_stream_map[iso_handle].frame_duration])
633    al_case = {
634        AUDIO_LOCATION_MONO: "mono",
635        AUDIO_LOCATION_LEFT: "left",
636        AUDIO_LOCATION_RIGHT: "right",
637        AUDIO_LOCATION_CENTER: "center"
638    }
639    file_name += ("_" + al_case[bis_stream_map[iso_handle].channel_allocation])
640    file_name += ("_frame" + str(bis_stream_map[iso_handle].octets_per_frame))
641    file_name += ("_" + convert_time_str(bis_stream_map[iso_handle].start_time))
642
643    if bis_stream_map[iso_handle].output_dump != []:
644        debug_print("Dump broadcast output...")
645        f = open(file_name + "_output.bin", 'wb')
646        if add_header == True:
647            generate_header(f, bis_stream_map[iso_handle], False)
648        arr = bytearray(bis_stream_map[iso_handle].output_dump)
649        f.write(arr)
650        f.close()
651        bis_stream_map[iso_handle].output_dump = []
652
653    return
654
655
656def parse_acl_packet(packet, flags, timestamp):
657    # Check the minimum acl length, HCI leader (4 bytes)
658    # + L2CAP header (4 bytes)
659    if len(packet) < 8:
660        debug_print("Invalid acl data length.")
661        return
662
663    connection_handle, packet = unpack_data(packet, 2, False)
664    connection_handle = connection_handle & 0x0FFF
665    if connection_handle > 0x0EFF:
666        debug_print("Invalid packet handle, skip")
667        return
668    total_length, packet = unpack_data(packet, 2, False)
669    if total_length != len(packet):
670        debug_print("Invalid total length, skip")
671        return
672    pdu_length, packet = unpack_data(packet, 2, False)
673    channel_id, packet = unpack_data(packet, 2, False)
674    if pdu_length != len(packet):
675        debug_print("Invalid pdu length, skip")
676        return
677
678    if debug_enable:
679        debug_print("ACL connection_handle - " + str(connection_handle))
680    # Parse ATT protocol
681    if channel_id == L2CAP_ATT_CID:
682        parse_att_packet(packet, connection_handle, flags, timestamp)
683
684
685def parse_iso_packet(packet, flags):
686    iso_handle, packet = unpack_data(packet, 2, False)
687    iso_handle &= 0x0EFF
688    iso_data_load_length, packet = unpack_data(packet, 2, False)
689    if iso_data_load_length != len(packet):
690        debug_print("Invalid iso data load length")
691        return
692
693    # Ignore timestamp, sequence number
694    packet = unpack_data(packet, 6, True)
695    iso_sdu_length, packet = unpack_data(packet, 2, False)
696    if len(packet) == 0:
697        debug_print("The iso data is empty")
698    elif iso_sdu_length != len(packet):
699        debug_print("Invalid iso sdu length")
700        return
701
702    # CIS stream
703    if iso_handle in cis_acl_map:
704        acl_handle = cis_acl_map[iso_handle]
705        if flags == SENT:
706            connection_map[acl_handle].output_dump.extend(struct.pack("<H", len(packet)))
707            connection_map[acl_handle].output_dump.extend(list(packet))
708        elif flags == RECEIVED:
709            connection_map[acl_handle].input_dump.extend(struct.pack("<H", len(packet)))
710            connection_map[acl_handle].input_dump.extend(list(packet))
711    elif iso_handle in bis_stream_map:
712        bis_stream_map[iso_handle].output_dump.extend(struct.pack("<H", len(packet)))
713        bis_stream_map[iso_handle].output_dump.extend(list(packet))
714
715
716def parse_next_packet(btsnoop_file):
717    global packet_number
718    packet_number += 1
719    packet_header = btsnoop_file.read(25)
720    if len(packet_header) != 25:
721        return False
722
723    (length_original, length_captured, flags, dropped_packets, timestamp, type) = struct.unpack(
724        ">IIIIqB", packet_header)
725
726    if length_original != length_captured:
727        debug_print("Filtered btnsoop, can not be parsed")
728        return False
729
730    packet = btsnoop_file.read(length_captured - 1)
731    if len(packet) != length_original - 1:
732        debug_print("Invalid packet length!")
733        return False
734
735    if dropped_packets:
736        debug_print("Invalid droped value")
737        return False
738
739    packet_handle = {
740        COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x, z)),
741        ACL_PACKET: (lambda x, y, z: parse_acl_packet(x, y, z)),
742        SCO_PACKET: (lambda x, y, z: None),
743        EVENT_PACKET: (lambda x, y, z: parse_event_packet(x)),
744        ISO_PACKET: (lambda x, y, z: parse_iso_packet(x, y))
745    }
746    packet_handle.get(type, lambda x, y, z: None)(packet, flags, timestamp)
747    return True
748
749
750def main():
751    parser = argparse.ArgumentParser()
752    parser.add_argument("btsnoop_file", help="btsnoop file contains LE audio start procedure")
753    parser.add_argument("-v", "--verbose", help="Enable verbose log.", action="store_true")
754    parser.add_argument(
755        "--header",
756        help="Add the header for LC3 Conformance Interoperability Test Software V.1.0.3.",
757        action="store_true")
758    parser.add_argument("--ase_handle", help="Set the ASE handle manually.", type=int)
759
760    argv = parser.parse_args()
761    BTSNOOP_FILE_NAME = argv.btsnoop_file
762
763    global debug_enable
764    global add_header
765    global ase_handle
766    if argv.verbose:
767        debug_enable = True
768
769    if argv.header:
770        add_header = True
771
772    if argv.ase_handle:
773        ase_handle = int(argv.ase_handle)
774
775    with open(BTSNOOP_FILE_NAME, "rb") as btsnoop_file:
776        if btsnoop_file.read(16) != BTSNOOP_HEADER:
777            print("Invalid btsnoop header")
778            exit(1)
779
780        while True:
781            if not parse_next_packet(btsnoop_file):
782                break
783
784    for handle in connection_map.keys():
785        dump_cis_audio_data_to_file(handle)
786
787    for handle in bis_stream_map.keys():
788        dump_bis_audio_data_to_file(handle)
789
790
791if __name__ == "__main__":
792    main()
793