1#!/usr/bin/env python 2# Copyright 2018 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15# 16# 17# 18# 19# This script extracts Hearing Aid audio data from btsnoop. 20# Generates a valid audio file which can be played using player like smplayer. 21# 22# Audio File Name Format: 23# [PEER_ADDRESS]-[START TIMESTAMP]-[AUDIO_TYPE]-[SAMPLE_RATE].[CODEC] 24# 25# Player: 26# smplayer 27# 28# NOTE: 29# Please make sure you HCI Snoop data file includes the following frames: 30# HearingAid "LE Enhanced Connection Complete", GATT write for Audio Control 31# Point with "Start cmd", and the data frames. 32 33import argparse 34import os 35import struct 36import sys 37import time 38 39IS_SENT = "IS_SENT" 40PEER_ADDRESS = "PEER_ADDRESS" 41CONNECTION_HANDLE = "CONNECTION_HANDLE" 42AUDIO_CONTROL_ATTR_HANDLE = "AUDIO_CONTROL_ATTR_HANDLE" 43START = "START" 44TIMESTAMP = "TIMESTAMP" 45CODEC = "CODEC" 46SAMPLE_RATE = "SAMPLE_RATE" 47AUDIO_TYPE = "AUDIO_TYPE" 48AUDIO_DATA_B = "AUDIO_DATA_B" 49 50AUDIO_CONTROL_POINT_UUID = "f0d4de7e4a88476c9d9f1937b0996cc0" 51SEC_CONVERT = 1000000 52folder = None 53 54force_audio_control_attr_handle = None 55default_audio_control_attr_handle = 0x0079 56 57audio_data = {} 58 59#======================================================================= 60# Parse ACL Data Function 61#======================================================================= 62 63#----------------------------------------------------------------------- 64# Parse Hearing Aid Packet 65#----------------------------------------------------------------------- 66 67 68def parse_acl_ha_audio_data(data, result): 69 """This function extracts HA audio data.""" 70 if len(data) < 2: 71 return 72 # Remove audio packet number 73 audio_data_b = data[1:] 74 update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], 75 AUDIO_DATA_B, audio_data_b) 76 77 78def parse_acl_ha_audio_type(data, result): 79 """This function parses HA audio control cmd audio type.""" 80 audio_type, data = unpack_data(data, 1) 81 if audio_type is None: 82 return 83 elif audio_type == 0x01: 84 audio_type = "Ringtone" 85 elif audio_type == 0x02: 86 audio_type = "Phonecall" 87 elif audio_type == 0x03: 88 audio_type = "Media" 89 else: 90 audio_type = "Unknown" 91 update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], 92 AUDIO_TYPE, audio_type) 93 94 95def parse_acl_ha_codec(data, result): 96 """This function parses HA audio control cmd codec and sample rate.""" 97 codec, data = unpack_data(data, 1) 98 if codec == 0x01: 99 codec = "G722" 100 sample_rate = "16KHZ" 101 elif codec == 0x02: 102 codec = "G722" 103 sample_rate = "24KHZ" 104 else: 105 codec = "Unknown" 106 sample_rate = "Unknown" 107 update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], 108 CODEC, codec) 109 update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], 110 SAMPLE_RATE, sample_rate) 111 parse_acl_ha_audio_type(data, result) 112 113 114def parse_acl_ha_audio_control_cmd(data, result): 115 """This function parses HA audio control cmd is start/stop.""" 116 control_cmd, data = unpack_data(data, 1) 117 if control_cmd == 0x01: 118 update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], 119 START, True) 120 update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], 121 TIMESTAMP, result[TIMESTAMP]) 122 parse_acl_ha_codec(data, result) 123 elif control_cmd == 0x02: 124 update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], 125 START, False) 126 127 128#----------------------------------------------------------------------- 129# Parse ACL Packet 130#----------------------------------------------------------------------- 131 132def parse_acl_att_long_uuid(data, result): 133 """This function parses ATT long UUID to get attr_handle.""" 134 # len (1 byte) + start_attr_handle (2 bytes) + properties (1 byte) + 135 # attr_handle (2 bytes) + long_uuid (16 bytes) = 22 bytes 136 if len(data) < 22: 137 return 138 # skip unpack len, start_attr_handle, properties. 139 data = data[4:] 140 attr_handle, data = unpack_data(data, 2) 141 long_uuid_list = [] 142 for p in range(0, 16): 143 long_uuid_list.append("{0:02x}".format(struct.unpack(">B", data[p])[0])) 144 long_uuid_list.reverse() 145 long_uuid = "".join(long_uuid_list) 146 # Check long_uuid is AUDIO_CONTROL_POINT uuid to get the attr_handle. 147 if long_uuid == AUDIO_CONTROL_POINT_UUID: 148 update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], 149 AUDIO_CONTROL_ATTR_HANDLE, attr_handle) 150 151 152def parse_acl_opcode(data, result): 153 """This function parses acl data opcode.""" 154 # opcode (1 byte) = 1 bytes 155 if len(data) < 1: 156 return 157 opcode, data = unpack_data(data, 1) 158 # Check opcode is 0x12 (write request) and attr_handle is 159 # audio_control_attr_handle for check it is HA audio control cmd. 160 if result[IS_SENT] and opcode == 0x12: 161 if len(data) < 2: 162 return 163 attr_handle, data = unpack_data(data, 2) 164 if attr_handle == \ 165 get_audio_control_attr_handle(result[CONNECTION_HANDLE]): 166 parse_acl_ha_audio_control_cmd(data, result) 167 # Check opcode is 0x09 (read response) to parse ATT long UUID. 168 elif not result[IS_SENT] and opcode == 0x09: 169 parse_acl_att_long_uuid(data, result) 170 171 172def parse_acl_handle(data, result): 173 """This function parses acl data handle.""" 174 # connection_handle (2 bytes) + total_len (2 bytes) + pdu (2 bytes) 175 # + channel_id (2 bytes) = 8 bytes 176 if len(data) < 8: 177 return 178 connection_handle, data = unpack_data(data, 2) 179 connection_handle = connection_handle & 0x0FFF 180 # skip unpack total_len 181 data = data[2:] 182 pdu, data = unpack_data(data, 2) 183 channel_id, data = unpack_data(data, 2) 184 185 # Check ATT packet or "Coc Data Packet" to get ATT information and audio 186 # data. 187 if connection_handle <= 0x0EFF: 188 if channel_id <= 0x003F: 189 result[CONNECTION_HANDLE] = connection_handle 190 parse_acl_opcode(data, result) 191 elif result[IS_SENT] and channel_id >= 0x0040 and channel_id <= 0x007F: 192 result[CONNECTION_HANDLE] = connection_handle 193 sdu, data = unpack_data(data, 2) 194 if pdu - 2 == sdu: 195 parse_acl_ha_audio_data(data, result) 196 197 198#======================================================================= 199# Parse HCI EVT Function 200#======================================================================= 201 202 203def parse_hci_evt_peer_address(data, result): 204 """This function parses peer address from hci event.""" 205 peer_address_list = [] 206 address_empty_list = ["00", "00", "00", "00", "00", "00"] 207 for n in range(0, 3): 208 if len(data) < 6: 209 return 210 for p in range(0, 6): 211 peer_address_list.append("{0:02x}".format(struct.unpack(">B", 212 data[p])[0])) 213 # Check the address is empty or not. 214 if peer_address_list == address_empty_list: 215 del peer_address_list[:] 216 data = data[6:] 217 else: 218 break 219 peer_address_list.reverse() 220 peer_address = "_".join(peer_address_list) 221 update_audio_data("", "", PEER_ADDRESS, peer_address) 222 update_audio_data(PEER_ADDRESS, peer_address, CONNECTION_HANDLE, 223 result[CONNECTION_HANDLE]) 224 225 226def parse_hci_evt_code(data, result): 227 """This function parses hci event content.""" 228 # hci_evt (1 byte) + param_total_len (1 byte) + sub_event (1 byte) 229 # + status (1 byte) + connection_handle (2 bytes) + role (1 byte) 230 # + address_type (1 byte) = 8 bytes 231 if len(data) < 8: 232 return 233 hci_evt, data = unpack_data(data, 1) 234 # skip unpack param_total_len. 235 data = data[1:] 236 sub_event, data = unpack_data(data, 1) 237 status, data = unpack_data(data, 1) 238 connection_handle, data = unpack_data(data, 2) 239 connection_handle = connection_handle & 0x0FFF 240 # skip unpack role, address_type. 241 data = data[2:] 242 # We will directly check it is LE Enhanced Connection Complete or not 243 # for get Connection Handle and Address. 244 if not result[IS_SENT] and hci_evt == 0x3E and sub_event == 0x0A \ 245 and status == 0x00 and connection_handle <= 0x0EFF: 246 result[CONNECTION_HANDLE] = connection_handle 247 parse_hci_evt_peer_address(data, result) 248 249 250#======================================================================= 251# Common Parse Function 252#======================================================================= 253 254 255def parse_packet_data(data, result): 256 """This function parses packet type.""" 257 packet_type, data = unpack_data(data, 1) 258 if packet_type == 0x02: 259 # Try to check HearingAid audio control packet and data packet. 260 parse_acl_handle(data, result) 261 elif packet_type == 0x04: 262 # Try to check HearingAid connection successful packet. 263 parse_hci_evt_code(data, result) 264 265 266def parse_packet(btsnoop_file): 267 """This function parses packet len, timestamp.""" 268 packet_result = {} 269 270 # ori_len (4 bytes) + include_len (4 bytes) + packet_flag (4 bytes) 271 # + drop (4 bytes) + timestamp (8 bytes) = 24 bytes 272 packet_header = btsnoop_file.read(24) 273 if len(packet_header) != 24: 274 return False 275 276 ori_len, include_len, packet_flag, drop, timestamp = \ 277 struct.unpack(">IIIIq", packet_header) 278 279 if ori_len == include_len: 280 packet_data = btsnoop_file.read(ori_len) 281 if len(packet_data) != ori_len: 282 return False 283 if packet_flag != 2 and drop == 0: 284 packet_result[IS_SENT] = (packet_flag == 0) 285 packet_result[TIMESTAMP] = convert_time_str(timestamp) 286 parse_packet_data(packet_data, packet_result) 287 else: 288 return False 289 290 return True 291 292 293#======================================================================= 294# Update and DumpData Function 295#======================================================================= 296 297 298def dump_audio_data(data): 299 """This function dumps audio data into file.""" 300 file_type = "." + data[CODEC] 301 file_name_list = [] 302 file_name_list.append(data[PEER_ADDRESS]) 303 file_name_list.append(data[TIMESTAMP]) 304 file_name_list.append(data[AUDIO_TYPE]) 305 file_name_list.append(data[SAMPLE_RATE]) 306 if folder is not None: 307 if not os.path.exists(folder): 308 os.makedirs(folder) 309 file_name = os.path.join(folder, "-".join(file_name_list) + file_type) 310 else: 311 file_name = "-".join(file_name_list) + file_type 312 sys.stdout.write("Start to dump audio file : " + file_name + "\n") 313 if data.has_key(AUDIO_DATA_B): 314 with open(file_name, "wb+") as g722_file: 315 g722_file.write(data[AUDIO_DATA_B]) 316 sys.stdout.write("Finished to dump Audio File: %s\n\n" % file_name) 317 else: 318 sys.stdout.write("Fail to dump Audio File: %s\n" % file_name) 319 sys.stdout.write("There isn't any Hearing Aid audio data.\n\n") 320 321 322def update_audio_data(relate_key, relate_value, key, value): 323 """ 324 This function records the dump audio file related information. 325 audio_data = { 326 PEER_ADDRESS:{ 327 PEER_ADDRESS: PEER_ADDRESS, 328 CONNECTION_HANDLE: CONNECTION_HANDLE, 329 AUDIO_CONTROL_ATTR_HANDLE: AUDIO_CONTROL_ATTR_HANDLE, 330 START: True or False, 331 TIMESTAMP: START_TIMESTAMP, 332 CODEC: CODEC, 333 SAMPLE_RATE: SAMPLE_RATE, 334 AUDIO_TYPE: AUDIO_TYPE, 335 AUDIO_DATA_B: AUDIO_DATA_B 336 }, 337 PEER_ADDRESS_2:{ 338 PEER_ADDRESS: PEER_ADDRESS, 339 CONNECTION_HANDLE: CONNECTION_HANDLE, 340 AUDIO_CONTROL_ATTR_HANDLE: AUDIO_CONTROL_ATTR_HANDLE, 341 START: True or False, 342 TIMESTAMP: START_TIMESTAMP, 343 CODEC: CODEC, 344 SAMPLE_RATE: SAMPLE_RATE, 345 AUDIO_TYPE: AUDIO_TYPE, 346 AUDIO_DATA_B: AUDIO_DATA_B 347 } 348 } 349 """ 350 if key == PEER_ADDRESS: 351 if audio_data.has_key(value): 352 # Dump audio data and clear previous data. 353 update_audio_data(key, value, START, False) 354 # Extra clear CONNECTION_HANDLE due to new connection create. 355 if audio_data[value].has_key(CONNECTION_HANDLE): 356 audio_data[value].pop(CONNECTION_HANDLE, "") 357 else: 358 device_audio_data = {key: value} 359 temp_audio_data = {value: device_audio_data} 360 audio_data.update(temp_audio_data) 361 else: 362 for i in audio_data: 363 if audio_data[i].has_key(relate_key) \ 364 and audio_data[i][relate_key] == relate_value: 365 if key == START: 366 if audio_data[i].has_key(key) and audio_data[i][key]: 367 dump_audio_data(audio_data[i]) 368 # Clear data except PEER_ADDRESS, CONNECTION_HANDLE and 369 # AUDIO_CONTROL_ATTR_HANDLE. 370 audio_data[i].pop(key, "") 371 audio_data[i].pop(TIMESTAMP, "") 372 audio_data[i].pop(CODEC, "") 373 audio_data[i].pop(SAMPLE_RATE, "") 374 audio_data[i].pop(AUDIO_TYPE, "") 375 audio_data[i].pop(AUDIO_DATA_B, "") 376 elif key == AUDIO_DATA_B: 377 if audio_data[i].has_key(START) and audio_data[i][START]: 378 if audio_data[i].has_key(AUDIO_DATA_B): 379 ori_audio_data = audio_data[i].pop(AUDIO_DATA_B, "") 380 value = ori_audio_data + value 381 else: 382 # Audio doesn't start, don't record. 383 return 384 device_audio_data = {key: value} 385 audio_data[i].update(device_audio_data) 386 387 388#======================================================================= 389# Tool Function 390#======================================================================= 391 392 393def get_audio_control_attr_handle(connection_handle): 394 """This function gets audio_control_attr_handle.""" 395 # If force_audio_control_attr_handle is set, will use it first. 396 if force_audio_control_attr_handle is not None: 397 return force_audio_control_attr_handle 398 399 # Try to check the audio_control_attr_handle is record into audio_data. 400 for i in audio_data: 401 if audio_data[i].has_key(CONNECTION_HANDLE) \ 402 and audio_data[i][CONNECTION_HANDLE] == connection_handle: 403 if audio_data[i].has_key(AUDIO_CONTROL_ATTR_HANDLE): 404 return audio_data[i][AUDIO_CONTROL_ATTR_HANDLE] 405 406 # Return default attr_handle if audio_data doesn't record it. 407 return default_audio_control_attr_handle 408 409 410def unpack_data(data, byte): 411 """This function unpacks data.""" 412 if byte == 1: 413 value = struct.unpack(">B", data[0])[0] 414 elif byte == 2: 415 value = struct.unpack(">H", data[1]+data[0])[0] 416 else: 417 value = "" 418 data = data[byte:] 419 return value, data 420 421 422def convert_time_str(timestamp): 423 """This function converts time to string format.""" 424 really_timestamp = float(timestamp) / SEC_CONVERT 425 local_timestamp = time.localtime(really_timestamp) 426 time_str = time.strftime("%m_%d__%H_%M_%S", local_timestamp) 427 dt = really_timestamp - long(really_timestamp) 428 ms_str = "{0:06}".format(int(round(dt * 1000000))) 429 full_time_str = time_str + "_" + ms_str 430 return full_time_str 431 432 433def set_config(): 434 """This function is for set config by flag and check the argv is correct.""" 435 argv_parser = argparse.ArgumentParser( 436 description="Extracts Hearing Aid audio data from BTSNOOP.") 437 argv_parser.add_argument("BTSNOOP", help="BLUETOOTH BTSNOOP file.") 438 argv_parser.add_argument("-f", "--folder", help="select output folder.", 439 dest="folder") 440 argv_parser.add_argument("-c1", "--connection-handle1", 441 help="set a fake connection handle 1 to capture \ 442 audio dump.", dest="connection_handle1", type=int) 443 argv_parser.add_argument("-c2", "--connection-handle2", 444 help="set a fake connection handle 2 to capture \ 445 audio dump.", dest="connection_handle2", type=int) 446 argv_parser.add_argument("-ns", "--no-start", help="No audio 'Start' cmd is \ 447 needed before extracting audio data.", 448 dest="no_start", default="False") 449 argv_parser.add_argument("-dc", "--default-codec", help="set a default \ 450 codec.", dest="codec", default="G722") 451 argv_parser.add_argument("-a", "--attr-handle", 452 help="force to select audio control attr handle.", 453 dest="audio_control_attr_handle", type=int) 454 arg = argv_parser.parse_args() 455 456 if arg.folder is not None: 457 global folder 458 folder = arg.folder 459 460 if arg.connection_handle1 is not None and arg.connection_handle2 is not None \ 461 and arg.connection_handle1 == arg.connection_handle2: 462 argv_parser.error("connection_handle1 can't be same with \ 463 connection_handle2") 464 exit(1) 465 466 if not (arg.no_start.lower() == "true" or arg.no_start.lower() == "false"): 467 argv_parser.error("-ns/--no-start arg is invalid, it should be true/false.") 468 exit(1) 469 470 if arg.connection_handle1 is not None: 471 fake_name = "ConnectionHandle" + str(arg.connection_handle1) 472 update_audio_data("", "", PEER_ADDRESS, fake_name) 473 update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE, 474 arg.connection_handle1) 475 if arg.no_start.lower() == "true": 476 update_audio_data(PEER_ADDRESS, fake_name, START, True) 477 update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP, "Unknown") 478 update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec) 479 update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown") 480 update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown") 481 482 if arg.connection_handle2 is not None: 483 fake_name = "ConnectionHandle" + str(arg.connection_handle2) 484 update_audio_data("", "", PEER_ADDRESS, fake_name) 485 update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE, 486 arg.connection_handle2) 487 if arg.no_start.lower() == "true": 488 update_audio_data(PEER_ADDRESS, fake_name, START, True) 489 update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP, "Unknown") 490 update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec) 491 update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown") 492 update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown") 493 494 if arg.audio_control_attr_handle is not None: 495 global force_audio_control_attr_handle 496 force_audio_control_attr_handle = arg.audio_control_attr_handle 497 498 if os.path.isfile(arg.BTSNOOP): 499 return arg.BTSNOOP 500 else: 501 argv_parser.error("BTSNOOP file not found: %s" % arg.BTSNOOP) 502 exit(1) 503 504 505def main(): 506 btsnoop_file_name = set_config() 507 508 with open(btsnoop_file_name, "rb") as btsnoop_file: 509 identification = btsnoop_file.read(8) 510 if identification != "btsnoop\0": 511 sys.stderr.write( 512 "Check identification fail. It is not correct btsnoop file.") 513 exit(1) 514 515 ver, data_link = struct.unpack(">II", btsnoop_file.read(4 + 4)) 516 if (ver != 1) or (data_link != 1002): 517 sys.stderr.write( 518 "Check ver or dataLink fail. It is not correct btsnoop file.") 519 exit(1) 520 521 while True: 522 if not parse_packet(btsnoop_file): 523 break 524 525 for i in audio_data: 526 if audio_data[i].get(START, False): 527 dump_audio_data(audio_data[i]) 528 529 530if __name__ == "__main__": 531 main() 532