1#!/usr/bin/env python 2# Copyright 2021 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15# 16# 17# This script extracts LE Audio audio data from btsnoop. 18# Generates a audio dump file where each frame consists of a two-byte frame 19# length information and the coded frame 20# 21# Audio File Name Format: 22# [Context]_sf[Sample frequency]_fd[Frame duration]_[Channel allocation]_ 23# frame[Octets per frame]_[Stream start timestamp]_[Direction].bin 24# 25# 26# Usage: 27# ./dump_le_audio.py BTSNOOP.cfa [-v] [--header] [--ase_handle ASE_HANDLE] 28# 29# -v, --verbose: to enable the verbose log 30# --header: Add the header for LC3 Conformance Interoperability Test Software V.1.0.3 from LC3 test specification. 31# --ase_handle ASE_HANDLE: Set the ASE handle manually. 32# 33# NOTE: 34# Please make sure you HCI Snoop data file includes the following frames: 35# 1. GATT service discovery for "ASE Control Point" chracteristic (if you give the ase_handle via command, the flow could be skipped) 36# 2. GATT config codec via ASE Control Point 37# 3. HCI create CIS to point out the "Start stream", and the data frames. 38# After all hci packet parse finished, would dump all remain audio data as well 39# 40# Correspondsing Spec. 41# ASCS_1.0 42# PACS_1.0 43# BAP_1.0 44# LC3.TS V1.0.3 45# 46from collections import defaultdict 47from os import X_OK 48 49import argparse 50import struct 51import sys 52import time 53 54BTSNOOP_FILE_NAME = "" 55BTSNOOP_HEADER = b'btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea' 56 57COMMADN_PACKET = 1 58ACL_PACKET = 2 59SCO_PACKET = 3 60EVENT_PACKET = 4 61ISO_PACKET = 5 62 63SENT = 0 64RECEIVED = 1 65 66L2CAP_ATT_CID = 4 67 68# opcode for att protocol 69OPCODE_ATT_READ_BY_TYPE_RSP = 0x09 70OPCODE_ATT_WRITE_CMD = 0x52 71 72UUID_ASE_CONTROL_POINT = 0x2BC6 73 74# opcode for ase control 75OPCODE_CONFIG_CODEC = 0x01 76OPCODE_ENABLE = 0x03 77OPCODE_RELEASE = 0x08 78 79# opcode for hci command 80OPCODE_HCI_CREATE_CIS = 0x2064 81OPCODE_REMOVE_ISO_DATA_PATH = 0x206F 82OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA = 0x203F 83OPCODE_LE_CREATE_BIG = 0x2068 84OPCODE_LE_SETUP_ISO_DATA_PATH = 0x206E 85 86# HCI event 87EVENT_CODE_LE_META_EVENT = 0x3E 88SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE = 0x1B 89 90TYPE_STREAMING_AUDIO_CONTEXTS = 0x02 91 92TYPE_SAMPLING_FREQUENCIES = 0x01 93TYPE_FRAME_DURATION = 0x02 94TYPE_CHANNEL_ALLOCATION = 0x03 95TYPE_OCTETS_PER_FRAME = 0x04 96 97CONTEXT_TYPE_CONVERSATIONAL = 0x0002 98CONTEXT_TYPE_MEDIA = 0x0004 99CONTEXT_TYPE_RINGTONE = 0x0200 100 101# sample frequency 102SAMPLE_FREQUENCY_8000 = 0x01 103SAMPLE_FREQUENCY_11025 = 0x02 104SAMPLE_FREQUENCY_16000 = 0x03 105SAMPLE_FREQUENCY_22050 = 0x04 106SAMPLE_FREQUENCY_24000 = 0x05 107SAMPLE_FREQUENCY_32000 = 0x06 108SAMPLE_FREQUENCY_44100 = 0x07 109SAMPLE_FREQUENCY_48000 = 0x08 110SAMPLE_FREQUENCY_88200 = 0x09 111SAMPLE_FREQUENCY_96000 = 0x0a 112SAMPLE_FREQUENCY_176400 = 0x0b 113SAMPLE_FREQUENCY_192000 = 0x0c 114SAMPLE_FREQUENCY_384000 = 0x0d 115 116FRAME_DURATION_7_5 = 0x00 117FRAME_DURATION_10 = 0x01 118 119AUDIO_LOCATION_MONO = 0x00 120AUDIO_LOCATION_LEFT = 0x01 121AUDIO_LOCATION_RIGHT = 0x02 122AUDIO_LOCATION_CENTER = 0x04 123 124AD_TYPE_SERVICE_DATA_16_BIT = 0x16 125BASIC_AUDIO_ANNOUNCEMENT_SERVICE = 0x1851 126 127packet_number = 0 128debug_enable = False 129add_header = False 130ase_handle = 0xFFFF 131 132 133class Connection: 134 135 def __init__(self): 136 self.ase_handle = 0xFFFF 137 self.number_of_ases = 0 138 self.ase = defaultdict(AseStream) 139 self.context = 0xFFFF 140 self.cis_handle = 0xFFFF 141 self.input_dump = [] 142 self.output_dump = [] 143 self.start_time = 0xFFFFFFFF 144 145 def dump(self): 146 print("start_time: " + str(self.start_time)) 147 print("ase_handle: " + str(self.ase_handle)) 148 print("context type: " + str(self.context)) 149 print("number_of_ases: " + str(self.number_of_ases)) 150 print("cis_handle: " + str(self.cis_handle)) 151 for id, ase_stream in self.ase.items(): 152 print("ase id: " + str(id)) 153 ase_stream.dump() 154 155 156class AseStream: 157 158 def __init__(self): 159 self.sampling_frequencies = 0xFF 160 self.frame_duration = 0xFF 161 self.channel_allocation = 0xFFFFFFFF 162 self.octets_per_frame = 0xFFFF 163 164 def dump(self): 165 print("sampling_frequencies: " + str(self.sampling_frequencies)) 166 print("frame_duration: " + str(self.frame_duration)) 167 print("channel_allocation: " + str(self.channel_allocation)) 168 print("octets_per_frame: " + str(self.octets_per_frame)) 169 170 171class Broadcast: 172 173 def __init__(self): 174 self.num_of_bis = defaultdict(int) # subgroup - num_of_bis 175 self.bis = defaultdict(BisStream) # bis_index - codec_config 176 self.bis_index_handle_map = defaultdict(int) # bis_index - bis_handle 177 self.bis_index_list = [] 178 179 def dump(self): 180 for bis_index, iso_stream in self.bis.items(): 181 print("bis_index: " + str(bis_index) + " bis handle: " + str(self.bis_index_handle_map[bis_index])) 182 iso_stream.dump() 183 184 185class BisStream: 186 187 def __init__(self): 188 self.sampling_frequencies = 0xFF 189 self.frame_duration = 0xFF 190 self.channel_allocation = 0xFFFFFFFF 191 self.octets_per_frame = 0xFFFF 192 self.output_dump = [] 193 self.start_time = 0xFFFFFFFF 194 195 def dump(self): 196 print("start_time: " + str(self.start_time)) 197 print("sampling_frequencies: " + str(self.sampling_frequencies)) 198 print("frame_duration: " + str(self.frame_duration)) 199 print("channel_allocation: " + str(self.channel_allocation)) 200 print("octets_per_frame: " + str(self.octets_per_frame)) 201 202 203connection_map = defaultdict(Connection) 204cis_acl_map = defaultdict(int) 205broadcast_map = defaultdict(Broadcast) 206big_adv_map = defaultdict(int) 207bis_stream_map = defaultdict(BisStream) 208 209 210def generate_header(file, stream, is_cis): 211 sf_case = { 212 SAMPLE_FREQUENCY_8000: 80, 213 SAMPLE_FREQUENCY_11025: 110, 214 SAMPLE_FREQUENCY_16000: 160, 215 SAMPLE_FREQUENCY_22050: 220, 216 SAMPLE_FREQUENCY_24000: 240, 217 SAMPLE_FREQUENCY_32000: 320, 218 SAMPLE_FREQUENCY_44100: 441, 219 SAMPLE_FREQUENCY_48000: 480, 220 SAMPLE_FREQUENCY_88200: 882, 221 SAMPLE_FREQUENCY_96000: 960, 222 SAMPLE_FREQUENCY_176400: 1764, 223 SAMPLE_FREQUENCY_192000: 1920, 224 SAMPLE_FREQUENCY_384000: 2840, 225 } 226 fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10} 227 al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2} 228 229 header = bytearray.fromhex('1ccc1200') 230 if is_cis: 231 for ase in stream.ase.values(): 232 header = header + struct.pack("<H", sf_case[ase.sampling_frequencies]) 233 header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration])) 234 header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 235 0, 48000000) 236 break 237 else: 238 header = header + struct.pack("<H", sf_case[stream.sampling_frequencies]) 239 header = header + struct.pack("<H", int(stream.octets_per_frame * 8 * 10 / fd_case[stream.frame_duration])) 240 header = header + struct.pack("<HHHL", al_case[stream.channel_allocation], fd_case[stream.frame_duration] * 100, 241 0, 48000000) 242 file.write(header) 243 244 245def parse_codec_information(connection_handle, ase_id, packet): 246 length, packet = unpack_data(packet, 1, False) 247 if len(packet) < length: 248 debug_print("Invalid codec configuration length") 249 return packet 250 ase = connection_map[connection_handle].ase[ase_id] 251 while length > 0: 252 config_length, packet = unpack_data(packet, 1, False) 253 config_type, packet = unpack_data(packet, 1, False) 254 value, packet = unpack_data(packet, config_length - 1, False) 255 if config_type == TYPE_SAMPLING_FREQUENCIES: 256 ase.sampling_frequencies = value 257 elif config_type == TYPE_FRAME_DURATION: 258 ase.frame_duration = value 259 elif config_type == TYPE_CHANNEL_ALLOCATION: 260 ase.channel_allocation = value 261 elif config_type == TYPE_OCTETS_PER_FRAME: 262 ase.octets_per_frame = value 263 length -= (config_length + 1) 264 265 return packet 266 267 268def parse_att_read_by_type_rsp(packet, connection_handle): 269 length, packet = unpack_data(packet, 1, False) 270 if length != 7: 271 #ignore the packet, we're only interested in this packet for the characteristic type UUID 272 return 273 274 if length > len(packet): 275 debug_print("Invalid att packet length") 276 return 277 278 attribute_handle, packet = unpack_data(packet, 2, False) 279 if debug_enable: 280 debug_print("attribute_handle - " + str(attribute_handle)) 281 packet = unpack_data(packet, 1, True) 282 value_handle, packet = unpack_data(packet, 2, False) 283 characteristic_uuid, packet = unpack_data(packet, 2, False) 284 if characteristic_uuid == UUID_ASE_CONTROL_POINT: 285 debug_print("ASE Control point found!") 286 connection_map[connection_handle].ase_handle = value_handle 287 288 289def parse_att_write_cmd(packet, connection_handle, timestamp): 290 attribute_handle, packet = unpack_data(packet, 2, False) 291 global ase_handle 292 if ase_handle != 0xFFFF: 293 connection_map[connection_handle].ase_handle = ase_handle 294 295 if connection_map[connection_handle].ase_handle == attribute_handle: 296 if debug_enable: 297 debug_print("Action with ASE Control point") 298 opcode, packet = unpack_data(packet, 1, False) 299 if opcode == OPCODE_CONFIG_CODEC: 300 debug_print("config_codec") 301 (connection_map[connection_handle].number_of_ases, packet) = unpack_data(packet, 1, False) 302 for i in range(connection_map[connection_handle].number_of_ases): 303 ase_id, packet = unpack_data(packet, 1, False) 304 # ignore target_latency, target_phy, codec_id 305 packet = unpack_data(packet, 7, True) 306 packet = parse_codec_information(connection_handle, ase_id, packet) 307 elif opcode == OPCODE_ENABLE: 308 debug_print("enable") 309 numbers_of_ases, packet = unpack_data(packet, 1, False) 310 for i in range(numbers_of_ases): 311 ase_id, packet = unpack_data(packet, 1, False) 312 metadata_length, packet = unpack_data(packet, 1, False) 313 if metadata_length > len(packet): 314 debug_print("Invalid metadata length") 315 return 316 length, packet = unpack_data(packet, 1, False) 317 if length > len(packet): 318 debug_print("Invalid metadata value length") 319 return 320 metadata_type, packet = unpack_data(packet, 1, False) 321 if metadata_type == TYPE_STREAMING_AUDIO_CONTEXTS: 322 (connection_map[connection_handle].context, packet) = unpack_data(packet, 2, False) 323 break 324 325 connection_map[connection_handle].start_time = timestamp 326 if debug_enable: 327 connection_map[connection_handle].dump() 328 329 330def parse_att_packet(packet, connection_handle, flags, timestamp): 331 opcode, packet = unpack_data(packet, 1, False) 332 packet_handle = { 333 (OPCODE_ATT_READ_BY_TYPE_RSP, RECEIVED): (lambda x, y, z: parse_att_read_by_type_rsp(x, y)), 334 (OPCODE_ATT_WRITE_CMD, SENT): (lambda x, y, z: parse_att_write_cmd(x, y, z)) 335 } 336 packet_handle.get((opcode, flags), lambda x, y, z: None)(packet, connection_handle, timestamp) 337 338 339def parse_big_codec_information(adv_handle, packet): 340 # Ignore presentation delay 341 packet = unpack_data(packet, 3, True) 342 number_of_subgroup, packet = unpack_data(packet, 1, False) 343 for subgroup in range(number_of_subgroup): 344 num_of_bis, packet = unpack_data(packet, 1, False) 345 broadcast_map[adv_handle].num_of_bis[subgroup] = num_of_bis 346 # Ignore codec id 347 packet = unpack_data(packet, 5, True) 348 length, packet = unpack_data(packet, 1, False) 349 if len(packet) < length: 350 print("Invalid subgroup codec information length") 351 return 352 353 while length > 0: 354 config_length, packet = unpack_data(packet, 1, False) 355 config_type, packet = unpack_data(packet, 1, False) 356 value, packet = unpack_data(packet, config_length - 1, False) 357 if config_type == TYPE_SAMPLING_FREQUENCIES: 358 sampling_frequencies = value 359 elif config_type == TYPE_FRAME_DURATION: 360 frame_duration = value 361 elif config_type == TYPE_OCTETS_PER_FRAME: 362 octets_per_frame = value 363 else: 364 print("Unknown config type") 365 length -= (config_length + 1) 366 367 # Ignore metadata 368 metadata_length, packet = unpack_data(packet, 1, False) 369 packet = unpack_data(packet, metadata_length, True) 370 371 for count in range(num_of_bis): 372 bis_index, packet = unpack_data(packet, 1, False) 373 broadcast_map[adv_handle].bis_index_list.append(bis_index) 374 length, packet = unpack_data(packet, 1, False) 375 if len(packet) < length: 376 print("Invalid level 3 codec information length") 377 return 378 379 while length > 0: 380 config_length, packet = unpack_data(packet, 1, False) 381 config_type, packet = unpack_data(packet, 1, False) 382 value, packet = unpack_data(packet, config_length - 1, False) 383 if config_type == TYPE_CHANNEL_ALLOCATION: 384 channel_allocation = value 385 else: 386 print("Ignored config type") 387 length -= (config_length + 1) 388 389 broadcast_map[adv_handle].bis[bis_index].sampling_frequencies = sampling_frequencies 390 broadcast_map[adv_handle].bis[bis_index].frame_duration = frame_duration 391 broadcast_map[adv_handle].bis[bis_index].octets_per_frame = octets_per_frame 392 broadcast_map[adv_handle].bis[bis_index].channel_allocation = channel_allocation 393 394 return packet 395 396 397def debug_print(log): 398 global packet_number 399 print("#" + str(packet_number) + ": " + log) 400 401 402def unpack_data(data, byte, ignore): 403 if ignore: 404 return data[byte:] 405 406 value = 0 407 if byte == 1: 408 value = struct.unpack("<B", data[:byte])[0] 409 elif byte == 2: 410 value = struct.unpack("<H", data[:byte])[0] 411 elif byte == 4: 412 value = struct.unpack("<I", data[:byte])[0] 413 return value, data[byte:] 414 415 416def parse_command_packet(packet, timestamp): 417 opcode, packet = unpack_data(packet, 2, False) 418 if opcode == OPCODE_HCI_CREATE_CIS: 419 debug_print("OPCODE_HCI_CREATE_CIS") 420 421 length, packet = unpack_data(packet, 1, False) 422 if length != len(packet): 423 debug_print("Invalid cmd length") 424 return 425 cis_count, packet = unpack_data(packet, 1, False) 426 for i in range(cis_count): 427 cis_handle, packet = unpack_data(packet, 2, False) 428 cis_handle &= 0x0EFF 429 acl_handle, packet = unpack_data(packet, 2, False) 430 connection_map[acl_handle].cis_handle = cis_handle 431 cis_acl_map[cis_handle] = acl_handle 432 433 if debug_enable: 434 connection_map[acl_handle].dump() 435 elif opcode == OPCODE_REMOVE_ISO_DATA_PATH: 436 debug_print("OPCODE_REMOVE_ISO_DATA_PATH") 437 438 length, packet = unpack_data(packet, 1, False) 439 if length != len(packet): 440 debug_print("Invalid cmd length") 441 return 442 443 iso_handle, packet = unpack_data(packet, 2, False) 444 # CIS stream 445 if iso_handle in cis_acl_map: 446 acl_handle = cis_acl_map[iso_handle] 447 dump_cis_audio_data_to_file(acl_handle) 448 # To Do: BIS stream 449 elif iso_handle in bis_stream_map: 450 dump_bis_audio_data_to_file(iso_handle) 451 elif opcode == OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA: 452 debug_print("OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA") 453 454 length, packet = unpack_data(packet, 1, False) 455 if length != len(packet): 456 debug_print("Invalid cmd length") 457 return 458 459 if length < 21: 460 debug_print("Ignored. Not basic audio announcement") 461 return 462 463 adv_hdl, packet = unpack_data(packet, 1, False) 464 #ignore operation, advertising_data_length 465 packet = unpack_data(packet, 2, True) 466 length, packet = unpack_data(packet, 1, False) 467 if length != len(packet): 468 debug_print("Invalid AD element length") 469 return 470 471 ad_type, packet = unpack_data(packet, 1, False) 472 service, packet = unpack_data(packet, 2, False) 473 if ad_type != AD_TYPE_SERVICE_DATA_16_BIT or service != BASIC_AUDIO_ANNOUNCEMENT_SERVICE: 474 debug_print("Ignored. Not basic audio announcement") 475 return 476 477 packet = parse_big_codec_information(adv_hdl, packet) 478 elif opcode == OPCODE_LE_CREATE_BIG: 479 debug_print("OPCODE_LE_CREATE_BIG") 480 481 length, packet = unpack_data(packet, 1, False) 482 if length != len(packet) and length < 31: 483 debug_print("Invalid Create BIG command length") 484 return 485 486 big_handle, packet = unpack_data(packet, 1, False) 487 adv_handle, packet = unpack_data(packet, 1, False) 488 big_adv_map[big_handle] = adv_handle 489 elif opcode == OPCODE_LE_SETUP_ISO_DATA_PATH: 490 debug_print("OPCODE_LE_SETUP_ISO_DATA_PATH") 491 length, packet = unpack_data(packet, 1, False) 492 if len(packet) != length: 493 debug_print("Invalid LE SETUP ISO DATA PATH command length") 494 return 495 496 iso_handle, packet = unpack_data(packet, 2, False) 497 if iso_handle in bis_stream_map: 498 bis_stream_map[iso_handle].start_time = timestamp 499 500 501def parse_event_packet(packet): 502 event_code, packet = unpack_data(packet, 1, False) 503 if event_code != EVENT_CODE_LE_META_EVENT: 504 return 505 506 length, packet = unpack_data(packet, 1, False) 507 if len(packet) != length: 508 print("Invalid LE mata event length") 509 return 510 511 subevent_code, packet = unpack_data(packet, 1, False) 512 if subevent_code != SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE: 513 return 514 515 status, packet = unpack_data(packet, 1, False) 516 if status != 0x00: 517 debug_print("Create_BIG failed") 518 return 519 520 big_handle, packet = unpack_data(packet, 1, False) 521 if big_handle not in big_adv_map: 522 print("Invalid BIG handle") 523 return 524 adv_handle = big_adv_map[big_handle] 525 # Ignore, we don't care these parameter 526 packet = unpack_data(packet, 15, True) 527 num_of_bis, packet = unpack_data(packet, 1, False) 528 for count in range(num_of_bis): 529 bis_handle, packet = unpack_data(packet, 2, False) 530 bis_index = broadcast_map[adv_handle].bis_index_list[count] 531 broadcast_map[adv_handle].bis_index_handle_map[bis_index] = bis_handle 532 bis_stream_map[bis_handle] = broadcast_map[adv_handle].bis[bis_index] 533 534 535def convert_time_str(timestamp): 536 """This function converts time to string format.""" 537 timestamp_sec = float(timestamp) / 1000000 538 local_timestamp = time.localtime(timestamp_sec) 539 ms = timestamp_sec - int(timestamp_sec) 540 ms_str = "{0:06}".format(int(round(ms * 1000000))) 541 542 str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp) 543 full_str_format = str_format + "_" + ms_str 544 545 return full_str_format 546 547 548def dump_cis_audio_data_to_file(acl_handle): 549 if debug_enable: 550 connection_map[acl_handle].dump() 551 file_name = "" 552 context_case = { 553 CONTEXT_TYPE_CONVERSATIONAL: "Conversational", 554 CONTEXT_TYPE_MEDIA: "Media", 555 CONTEXT_TYPE_RINGTONE: "Ringtone" 556 } 557 file_name += context_case.get(connection_map[acl_handle].context, "Unknown") 558 for ase in connection_map[acl_handle].ase.values(): 559 sf_case = { 560 SAMPLE_FREQUENCY_8000: "8000", 561 SAMPLE_FREQUENCY_11025: "11025", 562 SAMPLE_FREQUENCY_16000: "16000", 563 SAMPLE_FREQUENCY_22050: "22050", 564 SAMPLE_FREQUENCY_24000: "24000", 565 SAMPLE_FREQUENCY_32000: "32000", 566 SAMPLE_FREQUENCY_44100: "44100", 567 SAMPLE_FREQUENCY_48000: "48000", 568 SAMPLE_FREQUENCY_88200: "88200", 569 SAMPLE_FREQUENCY_96000: "96000", 570 SAMPLE_FREQUENCY_176400: "176400", 571 SAMPLE_FREQUENCY_192000: "192000", 572 SAMPLE_FREQUENCY_384000: "284000" 573 } 574 file_name += ("_sf" + sf_case[ase.sampling_frequencies]) 575 fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"} 576 file_name += ("_fd" + fd_case[ase.frame_duration]) 577 al_case = { 578 AUDIO_LOCATION_MONO: "mono", 579 AUDIO_LOCATION_LEFT: "left", 580 AUDIO_LOCATION_RIGHT: "right", 581 AUDIO_LOCATION_CENTER: "center" 582 } 583 file_name += ("_" + al_case[ase.channel_allocation]) 584 file_name += ("_frame" + str(ase.octets_per_frame)) 585 file_name += ("_" + convert_time_str(connection_map[acl_handle].start_time)) 586 break 587 588 if connection_map[acl_handle].input_dump != []: 589 debug_print("Dump unicast input...") 590 f = open(file_name + "_input.bin", 'wb') 591 if add_header == True: 592 generate_header(f, connection_map[acl_handle], True) 593 arr = bytearray(connection_map[acl_handle].input_dump) 594 f.write(arr) 595 f.close() 596 connection_map[acl_handle].input_dump = [] 597 598 if connection_map[acl_handle].output_dump != []: 599 debug_print("Dump unicast output...") 600 f = open(file_name + "_output.bin", 'wb') 601 if add_header == True: 602 generate_header(f, connection_map[acl_handle], True) 603 arr = bytearray(connection_map[acl_handle].output_dump) 604 f.write(arr) 605 f.close() 606 connection_map[acl_handle].output_dump = [] 607 608 return 609 610 611def dump_bis_audio_data_to_file(iso_handle): 612 if debug_enable: 613 bis_stream_map[iso_handle].dump() 614 file_name = "broadcast" 615 sf_case = { 616 SAMPLE_FREQUENCY_8000: "8000", 617 SAMPLE_FREQUENCY_11025: "11025", 618 SAMPLE_FREQUENCY_16000: "16000", 619 SAMPLE_FREQUENCY_22050: "22050", 620 SAMPLE_FREQUENCY_24000: "24000", 621 SAMPLE_FREQUENCY_32000: "32000", 622 SAMPLE_FREQUENCY_44100: "44100", 623 SAMPLE_FREQUENCY_48000: "48000", 624 SAMPLE_FREQUENCY_88200: "88200", 625 SAMPLE_FREQUENCY_96000: "96000", 626 SAMPLE_FREQUENCY_176400: "176400", 627 SAMPLE_FREQUENCY_192000: "192000", 628 SAMPLE_FREQUENCY_384000: "284000" 629 } 630 file_name += ("_sf" + sf_case[bis_stream_map[iso_handle].sampling_frequencies]) 631 fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"} 632 file_name += ("_fd" + fd_case[bis_stream_map[iso_handle].frame_duration]) 633 al_case = { 634 AUDIO_LOCATION_MONO: "mono", 635 AUDIO_LOCATION_LEFT: "left", 636 AUDIO_LOCATION_RIGHT: "right", 637 AUDIO_LOCATION_CENTER: "center" 638 } 639 file_name += ("_" + al_case[bis_stream_map[iso_handle].channel_allocation]) 640 file_name += ("_frame" + str(bis_stream_map[iso_handle].octets_per_frame)) 641 file_name += ("_" + convert_time_str(bis_stream_map[iso_handle].start_time)) 642 643 if bis_stream_map[iso_handle].output_dump != []: 644 debug_print("Dump broadcast output...") 645 f = open(file_name + "_output.bin", 'wb') 646 if add_header == True: 647 generate_header(f, bis_stream_map[iso_handle], False) 648 arr = bytearray(bis_stream_map[iso_handle].output_dump) 649 f.write(arr) 650 f.close() 651 bis_stream_map[iso_handle].output_dump = [] 652 653 return 654 655 656def parse_acl_packet(packet, flags, timestamp): 657 # Check the minimum acl length, HCI leader (4 bytes) 658 # + L2CAP header (4 bytes) 659 if len(packet) < 8: 660 debug_print("Invalid acl data length.") 661 return 662 663 connection_handle, packet = unpack_data(packet, 2, False) 664 connection_handle = connection_handle & 0x0FFF 665 if connection_handle > 0x0EFF: 666 debug_print("Invalid packet handle, skip") 667 return 668 total_length, packet = unpack_data(packet, 2, False) 669 if total_length != len(packet): 670 debug_print("Invalid total length, skip") 671 return 672 pdu_length, packet = unpack_data(packet, 2, False) 673 channel_id, packet = unpack_data(packet, 2, False) 674 if pdu_length != len(packet): 675 debug_print("Invalid pdu length, skip") 676 return 677 678 if debug_enable: 679 debug_print("ACL connection_handle - " + str(connection_handle)) 680 # Parse ATT protocol 681 if channel_id == L2CAP_ATT_CID: 682 parse_att_packet(packet, connection_handle, flags, timestamp) 683 684 685def parse_iso_packet(packet, flags): 686 iso_handle, packet = unpack_data(packet, 2, False) 687 iso_handle &= 0x0EFF 688 iso_data_load_length, packet = unpack_data(packet, 2, False) 689 if iso_data_load_length != len(packet): 690 debug_print("Invalid iso data load length") 691 return 692 693 # Ignore timestamp, sequence number 694 packet = unpack_data(packet, 6, True) 695 iso_sdu_length, packet = unpack_data(packet, 2, False) 696 if len(packet) == 0: 697 debug_print("The iso data is empty") 698 elif iso_sdu_length != len(packet): 699 debug_print("Invalid iso sdu length") 700 return 701 702 # CIS stream 703 if iso_handle in cis_acl_map: 704 acl_handle = cis_acl_map[iso_handle] 705 if flags == SENT: 706 connection_map[acl_handle].output_dump.extend(struct.pack("<H", len(packet))) 707 connection_map[acl_handle].output_dump.extend(list(packet)) 708 elif flags == RECEIVED: 709 connection_map[acl_handle].input_dump.extend(struct.pack("<H", len(packet))) 710 connection_map[acl_handle].input_dump.extend(list(packet)) 711 elif iso_handle in bis_stream_map: 712 bis_stream_map[iso_handle].output_dump.extend(struct.pack("<H", len(packet))) 713 bis_stream_map[iso_handle].output_dump.extend(list(packet)) 714 715 716def parse_next_packet(btsnoop_file): 717 global packet_number 718 packet_number += 1 719 packet_header = btsnoop_file.read(25) 720 if len(packet_header) != 25: 721 return False 722 723 (length_original, length_captured, flags, dropped_packets, timestamp, type) = struct.unpack( 724 ">IIIIqB", packet_header) 725 726 if length_original != length_captured: 727 debug_print("Filtered btnsoop, can not be parsed") 728 return False 729 730 packet = btsnoop_file.read(length_captured - 1) 731 if len(packet) != length_original - 1: 732 debug_print("Invalid packet length!") 733 return False 734 735 if dropped_packets: 736 debug_print("Invalid droped value") 737 return False 738 739 packet_handle = { 740 COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x, z)), 741 ACL_PACKET: (lambda x, y, z: parse_acl_packet(x, y, z)), 742 SCO_PACKET: (lambda x, y, z: None), 743 EVENT_PACKET: (lambda x, y, z: parse_event_packet(x)), 744 ISO_PACKET: (lambda x, y, z: parse_iso_packet(x, y)) 745 } 746 packet_handle.get(type, lambda x, y, z: None)(packet, flags, timestamp) 747 return True 748 749 750def main(): 751 parser = argparse.ArgumentParser() 752 parser.add_argument("btsnoop_file", help="btsnoop file contains LE audio start procedure") 753 parser.add_argument("-v", "--verbose", help="Enable verbose log.", action="store_true") 754 parser.add_argument( 755 "--header", 756 help="Add the header for LC3 Conformance Interoperability Test Software V.1.0.3.", 757 action="store_true") 758 parser.add_argument("--ase_handle", help="Set the ASE handle manually.", type=int) 759 760 argv = parser.parse_args() 761 BTSNOOP_FILE_NAME = argv.btsnoop_file 762 763 global debug_enable 764 global add_header 765 global ase_handle 766 if argv.verbose: 767 debug_enable = True 768 769 if argv.header: 770 add_header = True 771 772 if argv.ase_handle: 773 ase_handle = int(argv.ase_handle) 774 775 with open(BTSNOOP_FILE_NAME, "rb") as btsnoop_file: 776 if btsnoop_file.read(16) != BTSNOOP_HEADER: 777 print("Invalid btsnoop header") 778 exit(1) 779 780 while True: 781 if not parse_next_packet(btsnoop_file): 782 break 783 784 for handle in connection_map.keys(): 785 dump_cis_audio_data_to_file(handle) 786 787 for handle in bis_stream_map.keys(): 788 dump_bis_audio_data_to_file(handle) 789 790 791if __name__ == "__main__": 792 main() 793