• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15#
16#
17#
18#
19# This script extracts Hearing Aid audio data from btsnoop.
20# Generates a valid audio file which can be played using player like smplayer.
21#
22# Audio File Name Format:
23# [PEER_ADDRESS]-[START TIMESTAMP]-[AUDIO_TYPE]-[SAMPLE_RATE].[CODEC]
24#
25# Player:
26# smplayer
27#
28# NOTE:
29# Please make sure you HCI Snoop data file includes the following frames:
30# HearingAid "LE Enhanced Connection Complete", GATT write for Audio Control
31# Point with "Start cmd", and the data frames.
32
33import argparse
34import os
35import struct
36import sys
37import time
38
39IS_SENT = "IS_SENT"
40PEER_ADDRESS = "PEER_ADDRESS"
41CONNECTION_HANDLE = "CONNECTION_HANDLE"
42AUDIO_CONTROL_ATTR_HANDLE = "AUDIO_CONTROL_ATTR_HANDLE"
43START = "START"
44TIMESTAMP = "TIMESTAMP"
45CODEC = "CODEC"
46SAMPLE_RATE = "SAMPLE_RATE"
47AUDIO_TYPE = "AUDIO_TYPE"
48AUDIO_DATA_B = "AUDIO_DATA_B"
49
50AUDIO_CONTROL_POINT_UUID = "f0d4de7e4a88476c9d9f1937b0996cc0"
51SEC_CONVERT = 1000000
52folder = None
53
54force_audio_control_attr_handle = None
55default_audio_control_attr_handle = 0x0079
56
57audio_data = {}
58
59#=======================================================================
60# Parse ACL Data Function
61#=======================================================================
62
63#-----------------------------------------------------------------------
64# Parse Hearing Aid Packet
65#-----------------------------------------------------------------------
66
67
68def parse_acl_ha_audio_data(data, result):
69  """This function extracts HA audio data."""
70  if len(data) < 2:
71    return
72  # Remove audio packet number
73  audio_data_b = data[1:]
74  update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
75                    AUDIO_DATA_B, audio_data_b)
76
77
78def parse_acl_ha_audio_type(data, result):
79  """This function parses HA audio control cmd audio type."""
80  audio_type, data = unpack_data(data, 1)
81  if audio_type is None:
82    return
83  elif audio_type == 0x01:
84    audio_type = "Ringtone"
85  elif audio_type == 0x02:
86    audio_type = "Phonecall"
87  elif audio_type == 0x03:
88    audio_type = "Media"
89  else:
90    audio_type = "Unknown"
91  update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
92                    AUDIO_TYPE, audio_type)
93
94
95def parse_acl_ha_codec(data, result):
96  """This function parses HA audio control cmd codec and sample rate."""
97  codec, data = unpack_data(data, 1)
98  if codec == 0x01:
99    codec = "G722"
100    sample_rate = "16KHZ"
101  elif codec == 0x02:
102    codec = "G722"
103    sample_rate = "24KHZ"
104  else:
105    codec = "Unknown"
106    sample_rate = "Unknown"
107  update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
108                    CODEC, codec)
109  update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
110                    SAMPLE_RATE, sample_rate)
111  parse_acl_ha_audio_type(data, result)
112
113
114def parse_acl_ha_audio_control_cmd(data, result):
115  """This function parses HA audio control cmd is start/stop."""
116  control_cmd, data = unpack_data(data, 1)
117  if control_cmd == 0x01:
118    update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
119                      START, True)
120    update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
121                      TIMESTAMP, result[TIMESTAMP])
122    parse_acl_ha_codec(data, result)
123  elif control_cmd == 0x02:
124    update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
125                      START, False)
126
127
128#-----------------------------------------------------------------------
129# Parse ACL Packet
130#-----------------------------------------------------------------------
131
132def parse_acl_att_long_uuid(data, result):
133  """This function parses ATT long UUID to get attr_handle."""
134  # len (1 byte) + start_attr_handle (2 bytes) + properties (1 byte) +
135  # attr_handle (2 bytes) + long_uuid (16 bytes) = 22 bytes
136  if len(data) < 22:
137    return
138  # skip unpack len, start_attr_handle, properties.
139  data = data[4:]
140  attr_handle, data = unpack_data(data, 2)
141  long_uuid_list = []
142  for p in range(0, 16):
143    long_uuid_list.append("{0:02x}".format(struct.unpack(">B", data[p])[0]))
144  long_uuid_list.reverse()
145  long_uuid = "".join(long_uuid_list)
146  # Check long_uuid is AUDIO_CONTROL_POINT uuid to get the attr_handle.
147  if long_uuid == AUDIO_CONTROL_POINT_UUID:
148    update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
149                      AUDIO_CONTROL_ATTR_HANDLE, attr_handle)
150
151
152def parse_acl_opcode(data, result):
153  """This function parses acl data opcode."""
154  # opcode (1 byte) = 1 bytes
155  if len(data) < 1:
156    return
157  opcode, data = unpack_data(data, 1)
158  # Check opcode is 0x12 (write request) and attr_handle is
159  # audio_control_attr_handle for check it is HA audio control cmd.
160  if result[IS_SENT] and opcode == 0x12:
161    if len(data) < 2:
162      return
163    attr_handle, data = unpack_data(data, 2)
164    if attr_handle == \
165        get_audio_control_attr_handle(result[CONNECTION_HANDLE]):
166          parse_acl_ha_audio_control_cmd(data, result)
167  # Check opcode is 0x09 (read response) to parse ATT long UUID.
168  elif not result[IS_SENT] and opcode == 0x09:
169    parse_acl_att_long_uuid(data, result)
170
171
172def parse_acl_handle(data, result):
173  """This function parses acl data handle."""
174  # connection_handle (2 bytes) + total_len (2 bytes) + pdu (2 bytes)
175  # + channel_id (2 bytes) = 8 bytes
176  if len(data) < 8:
177    return
178  connection_handle, data = unpack_data(data, 2)
179  connection_handle = connection_handle & 0x0FFF
180  # skip unpack total_len
181  data = data[2:]
182  pdu, data = unpack_data(data, 2)
183  channel_id, data = unpack_data(data, 2)
184
185  # Check ATT packet or "Coc Data Packet" to get ATT information and audio
186  # data.
187  if connection_handle <= 0x0EFF:
188    if channel_id <= 0x003F:
189      result[CONNECTION_HANDLE] = connection_handle
190      parse_acl_opcode(data, result)
191    elif result[IS_SENT] and channel_id >= 0x0040 and channel_id <= 0x007F:
192      result[CONNECTION_HANDLE] = connection_handle
193      sdu, data = unpack_data(data, 2)
194      if pdu - 2 == sdu:
195        parse_acl_ha_audio_data(data, result)
196
197
198#=======================================================================
199# Parse HCI EVT Function
200#=======================================================================
201
202
203def parse_hci_evt_peer_address(data, result):
204  """This function parses peer address from hci event."""
205  peer_address_list = []
206  address_empty_list = ["00", "00", "00", "00", "00", "00"]
207  for n in range(0, 3):
208    if len(data) < 6:
209      return
210    for p in range(0, 6):
211      peer_address_list.append("{0:02x}".format(struct.unpack(">B",
212                                                              data[p])[0]))
213    # Check the address is empty or not.
214    if peer_address_list == address_empty_list:
215      del peer_address_list[:]
216      data = data[6:]
217    else:
218      break
219  peer_address_list.reverse()
220  peer_address = "_".join(peer_address_list)
221  update_audio_data("", "", PEER_ADDRESS, peer_address)
222  update_audio_data(PEER_ADDRESS, peer_address, CONNECTION_HANDLE,
223                    result[CONNECTION_HANDLE])
224
225
226def parse_hci_evt_code(data, result):
227  """This function parses hci event content."""
228  # hci_evt (1 byte) + param_total_len (1 byte) + sub_event (1 byte)
229  # + status (1 byte) + connection_handle (2 bytes) + role (1 byte)
230  # + address_type (1 byte) = 8 bytes
231  if len(data) < 8:
232    return
233  hci_evt, data = unpack_data(data, 1)
234  # skip unpack param_total_len.
235  data = data[1:]
236  sub_event, data = unpack_data(data, 1)
237  status, data = unpack_data(data, 1)
238  connection_handle, data = unpack_data(data, 2)
239  connection_handle = connection_handle & 0x0FFF
240  # skip unpack role, address_type.
241  data = data[2:]
242  # We will directly check it is LE Enhanced Connection Complete or not
243  # for get Connection Handle and Address.
244  if not result[IS_SENT] and hci_evt == 0x3E and sub_event == 0x0A \
245      and status == 0x00 and connection_handle <= 0x0EFF:
246        result[CONNECTION_HANDLE] = connection_handle
247        parse_hci_evt_peer_address(data, result)
248
249
250#=======================================================================
251# Common Parse Function
252#=======================================================================
253
254
255def parse_packet_data(data, result):
256  """This function parses packet type."""
257  packet_type, data = unpack_data(data, 1)
258  if packet_type == 0x02:
259    # Try to check HearingAid audio control packet and data packet.
260    parse_acl_handle(data, result)
261  elif packet_type == 0x04:
262    # Try to check HearingAid connection successful packet.
263    parse_hci_evt_code(data, result)
264
265
266def parse_packet(btsnoop_file):
267  """This function parses packet len, timestamp."""
268  packet_result = {}
269
270  # ori_len (4 bytes) + include_len (4 bytes) + packet_flag (4 bytes)
271  # + drop (4 bytes) + timestamp (8 bytes) = 24 bytes
272  packet_header = btsnoop_file.read(24)
273  if len(packet_header) != 24:
274    return False
275
276  ori_len, include_len, packet_flag, drop, timestamp = \
277      struct.unpack(">IIIIq", packet_header)
278
279  if ori_len == include_len:
280    packet_data = btsnoop_file.read(ori_len)
281    if len(packet_data) != ori_len:
282      return False
283    if packet_flag != 2 and drop == 0:
284      packet_result[IS_SENT] = (packet_flag == 0)
285      packet_result[TIMESTAMP] = convert_time_str(timestamp)
286      parse_packet_data(packet_data, packet_result)
287  else:
288    return False
289
290  return True
291
292
293#=======================================================================
294# Update and DumpData Function
295#=======================================================================
296
297
298def dump_audio_data(data):
299  """This function dumps audio data into file."""
300  file_type = "." + data[CODEC]
301  file_name_list = []
302  file_name_list.append(data[PEER_ADDRESS])
303  file_name_list.append(data[TIMESTAMP])
304  file_name_list.append(data[AUDIO_TYPE])
305  file_name_list.append(data[SAMPLE_RATE])
306  if folder is not None:
307    if not os.path.exists(folder):
308      os.makedirs(folder)
309    file_name = os.path.join(folder, "-".join(file_name_list) + file_type)
310  else:
311    file_name = "-".join(file_name_list) + file_type
312  sys.stdout.write("Start to dump audio file : " + file_name + "\n")
313  if data.has_key(AUDIO_DATA_B):
314    with open(file_name, "wb+") as g722_file:
315      g722_file.write(data[AUDIO_DATA_B])
316      sys.stdout.write("Finished to dump Audio File: %s\n\n" % file_name)
317  else:
318    sys.stdout.write("Fail to dump Audio File: %s\n" % file_name)
319    sys.stdout.write("There isn't any Hearing Aid audio data.\n\n")
320
321
322def update_audio_data(relate_key, relate_value, key, value):
323  """
324  This function records the dump audio file related information.
325  audio_data = {
326    PEER_ADDRESS:{
327      PEER_ADDRESS: PEER_ADDRESS,
328      CONNECTION_HANDLE: CONNECTION_HANDLE,
329      AUDIO_CONTROL_ATTR_HANDLE: AUDIO_CONTROL_ATTR_HANDLE,
330      START: True or False,
331      TIMESTAMP: START_TIMESTAMP,
332      CODEC: CODEC,
333      SAMPLE_RATE: SAMPLE_RATE,
334      AUDIO_TYPE: AUDIO_TYPE,
335      AUDIO_DATA_B: AUDIO_DATA_B
336    },
337    PEER_ADDRESS_2:{
338      PEER_ADDRESS: PEER_ADDRESS,
339      CONNECTION_HANDLE: CONNECTION_HANDLE,
340      AUDIO_CONTROL_ATTR_HANDLE: AUDIO_CONTROL_ATTR_HANDLE,
341      START: True or False,
342      TIMESTAMP: START_TIMESTAMP,
343      CODEC: CODEC,
344      SAMPLE_RATE: SAMPLE_RATE,
345      AUDIO_TYPE: AUDIO_TYPE,
346      AUDIO_DATA_B: AUDIO_DATA_B
347    }
348  }
349  """
350  if key == PEER_ADDRESS:
351    if audio_data.has_key(value):
352      # Dump audio data and clear previous data.
353      update_audio_data(key, value, START, False)
354      # Extra clear CONNECTION_HANDLE due to new connection create.
355      if audio_data[value].has_key(CONNECTION_HANDLE):
356        audio_data[value].pop(CONNECTION_HANDLE, "")
357    else:
358      device_audio_data = {key: value}
359      temp_audio_data = {value: device_audio_data}
360      audio_data.update(temp_audio_data)
361  else:
362    for i in audio_data:
363      if audio_data[i].has_key(relate_key) \
364          and audio_data[i][relate_key] == relate_value:
365            if key == START:
366              if audio_data[i].has_key(key) and audio_data[i][key]:
367                dump_audio_data(audio_data[i])
368              # Clear data except PEER_ADDRESS, CONNECTION_HANDLE and
369              # AUDIO_CONTROL_ATTR_HANDLE.
370              audio_data[i].pop(key, "")
371              audio_data[i].pop(TIMESTAMP, "")
372              audio_data[i].pop(CODEC, "")
373              audio_data[i].pop(SAMPLE_RATE, "")
374              audio_data[i].pop(AUDIO_TYPE, "")
375              audio_data[i].pop(AUDIO_DATA_B, "")
376            elif key == AUDIO_DATA_B:
377              if audio_data[i].has_key(START) and audio_data[i][START]:
378                if audio_data[i].has_key(AUDIO_DATA_B):
379                  ori_audio_data = audio_data[i].pop(AUDIO_DATA_B, "")
380                  value = ori_audio_data + value
381              else:
382                # Audio doesn't start, don't record.
383                return
384            device_audio_data = {key: value}
385            audio_data[i].update(device_audio_data)
386
387
388#=======================================================================
389# Tool Function
390#=======================================================================
391
392
393def get_audio_control_attr_handle(connection_handle):
394  """This function gets audio_control_attr_handle."""
395  # If force_audio_control_attr_handle is set, will use it first.
396  if force_audio_control_attr_handle is not None:
397    return force_audio_control_attr_handle
398
399  # Try to check the audio_control_attr_handle is record into audio_data.
400  for i in audio_data:
401    if audio_data[i].has_key(CONNECTION_HANDLE) \
402        and audio_data[i][CONNECTION_HANDLE] == connection_handle:
403          if audio_data[i].has_key(AUDIO_CONTROL_ATTR_HANDLE):
404            return audio_data[i][AUDIO_CONTROL_ATTR_HANDLE]
405
406  # Return default attr_handle if audio_data doesn't record it.
407  return default_audio_control_attr_handle
408
409
410def unpack_data(data, byte):
411  """This function unpacks data."""
412  if byte == 1:
413    value = struct.unpack(">B", data[0])[0]
414  elif byte == 2:
415    value = struct.unpack(">H", data[1]+data[0])[0]
416  else:
417    value = ""
418  data = data[byte:]
419  return value, data
420
421
422def convert_time_str(timestamp):
423  """This function converts time to string format."""
424  really_timestamp = float(timestamp) / SEC_CONVERT
425  local_timestamp = time.localtime(really_timestamp)
426  time_str = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
427  dt = really_timestamp - long(really_timestamp)
428  ms_str = "{0:06}".format(int(round(dt * 1000000)))
429  full_time_str = time_str + "_" + ms_str
430  return full_time_str
431
432
433def set_config():
434  """This function is for set config by flag and check the argv is correct."""
435  argv_parser = argparse.ArgumentParser(
436      description="Extracts Hearing Aid audio data from BTSNOOP.")
437  argv_parser.add_argument("BTSNOOP", help="BLUETOOTH BTSNOOP file.")
438  argv_parser.add_argument("-f", "--folder", help="select output folder.",
439                           dest="folder")
440  argv_parser.add_argument("-c1", "--connection-handle1",
441                           help="set a fake connection handle 1 to capture \
442                           audio dump.", dest="connection_handle1", type=int)
443  argv_parser.add_argument("-c2", "--connection-handle2",
444                           help="set a fake connection handle 2 to capture \
445                           audio dump.", dest="connection_handle2", type=int)
446  argv_parser.add_argument("-ns", "--no-start", help="No audio 'Start' cmd is \
447                           needed before extracting audio data.",
448                           dest="no_start", default="False")
449  argv_parser.add_argument("-dc", "--default-codec", help="set a default \
450                           codec.", dest="codec", default="G722")
451  argv_parser.add_argument("-a", "--attr-handle",
452                           help="force to select audio control attr handle.",
453                           dest="audio_control_attr_handle", type=int)
454  arg = argv_parser.parse_args()
455
456  if arg.folder is not None:
457    global folder
458    folder = arg.folder
459
460  if arg.connection_handle1 is not None and arg.connection_handle2 is not None \
461      and arg.connection_handle1 == arg.connection_handle2:
462        argv_parser.error("connection_handle1 can't be same with \
463                          connection_handle2")
464        exit(1)
465
466  if not (arg.no_start.lower() == "true" or arg.no_start.lower() == "false"):
467    argv_parser.error("-ns/--no-start arg is invalid, it should be true/false.")
468    exit(1)
469
470  if arg.connection_handle1 is not None:
471    fake_name = "ConnectionHandle" + str(arg.connection_handle1)
472    update_audio_data("", "", PEER_ADDRESS, fake_name)
473    update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE,
474                      arg.connection_handle1)
475    if arg.no_start.lower() == "true":
476      update_audio_data(PEER_ADDRESS, fake_name, START, True)
477      update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP, "Unknown")
478      update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
479      update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
480      update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
481
482  if arg.connection_handle2 is not None:
483    fake_name = "ConnectionHandle" + str(arg.connection_handle2)
484    update_audio_data("", "", PEER_ADDRESS, fake_name)
485    update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE,
486                      arg.connection_handle2)
487    if arg.no_start.lower() == "true":
488      update_audio_data(PEER_ADDRESS, fake_name, START, True)
489      update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP, "Unknown")
490      update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
491      update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
492      update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
493
494  if arg.audio_control_attr_handle is not None:
495    global force_audio_control_attr_handle
496    force_audio_control_attr_handle = arg.audio_control_attr_handle
497
498  if os.path.isfile(arg.BTSNOOP):
499    return arg.BTSNOOP
500  else:
501    argv_parser.error("BTSNOOP file not found: %s" % arg.BTSNOOP)
502    exit(1)
503
504
505def main():
506  btsnoop_file_name = set_config()
507
508  with open(btsnoop_file_name, "rb") as btsnoop_file:
509    identification = btsnoop_file.read(8)
510    if identification != "btsnoop\0":
511      sys.stderr.write(
512          "Check identification fail. It is not correct btsnoop file.")
513      exit(1)
514
515    ver, data_link = struct.unpack(">II", btsnoop_file.read(4 + 4))
516    if (ver != 1) or (data_link != 1002):
517      sys.stderr.write(
518          "Check ver or dataLink fail. It is not correct btsnoop file.")
519      exit(1)
520
521    while True:
522      if not parse_packet(btsnoop_file):
523        break
524
525    for i in audio_data:
526      if audio_data[i].get(START, False):
527        dump_audio_data(audio_data[i])
528
529
530if __name__ == "__main__":
531  main()
532