• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2019, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import argparse
18import asyncio
19import csv
20import itertools
21import os
22import re
23import struct
24import sys
25import tempfile
26import time
27import zipfile
28from typing import Any, Callable, Dict, Generic, Iterable, List, NamedTuple, TextIO, Tuple, TypeVar, Optional, Union
29
30# Include generated protos.
31dir_name = os.path.dirname(os.path.realpath(__file__))
32sys.path.append(dir_name + "/generated")
33
34from TraceFile_pb2 import *
35
36
37def parse_options(argv: List[str] = None):
38  """Parse command line arguments and return an argparse Namespace object."""
39  parser = argparse.ArgumentParser(description="Compile a TraceFile.proto from a manual text file.")
40  # argparse considers args starting with - and -- optional in --help, even though required=True.
41  # by using a named argument group --help will clearly say that it's required instead of optional.
42  required_named = parser.add_argument_group('required named arguments')
43
44  # optional arguments
45  # use a group here to get the required arguments to appear 'above' the optional arguments in help.
46  optional_named = parser.add_argument_group('optional named arguments')
47  optional_named.add_argument('-opb', '--output-proto-binary', dest='output_proto_binary', action='store', help='Write binary proto output to file.')
48  optional_named.add_argument('-pm', '--pinlist-meta', dest='pinlist_meta', action='store', help='Path to pinlist.meta (default=none) binary file.')
49  optional_named.add_argument('-pmp', '--pinlist-meta-parent', dest='pinlist_meta_parent', action='store', help='Device path that the pinlist.meta applies to (e.g. /data/.../somefile.apk)')
50  optional_named.add_argument('-i', '--input', dest='input', action='store', help='Input text file (default stdin).')
51  optional_named.add_argument('-zp', '--zip_path', dest='zip_path', action='append', help='Directory containing zip files.')
52  optional_named.add_argument('-d', '--debug', dest='debug', action='store_true', help='Add extra debugging output')
53  optional_named.add_argument('-ot', '--output-text', dest='output_text', action='store', help='Output text file (default stdout).')
54
55  return parser.parse_args(argv)
56
57# TODO: refactor this with a common library file with analyze_metrics.py
58def _debug_print(*args, **kwargs):
59  """Print the args to sys.stderr if the --debug/-d flag was passed in."""
60  if _debug:
61    print(*args, **kwargs, file=sys.stderr)
62
63class BadInputError(Exception):
64  pass
65
66InputRecord = NamedTuple('InputRecord', [('filepath', str), ('offset', int), ('length', int), ('remark', str)])
67
68def find_zip_in_paths(original_name, zip_paths):
69  # /foo/bar/bax.zip -> bax.zip
70  file_basename = os.path.split(original_name)[1]
71
72  # the file must be located in one of the --zip-path arguments
73  matched = None
74  for zip_path in zip_paths:
75    for dir_entry in os.listdir(zip_path):
76      if dir_entry == file_basename:
77        matched = os.path.join(zip_path, dir_entry)
78        break
79    if matched:
80      break
81
82  if not matched:
83    raise ValueError("%s could not be found in any of the --zip_path specified." %(file_basename))
84
85  _debug_print("found zip file ", file_basename, " in ", matched)
86
87  if not zipfile.is_zipfile(matched):
88    raise ValueError("%s is not a zip file" %(matched))
89
90  return matched
91
92def handle_zip_entry(input_record, zip_paths):
93
94  res = re.match("([^!]+)[!](.*)", input_record.filepath)
95
96  if not res:
97    return input_record
98
99                         # 'foo!bar'
100  in_filepath = res[1]   # -> 'foo'
101  in_zip_entry = res[2]  # -> 'bar'
102
103  matched = find_zip_in_paths(in_filepath, zip_paths)
104
105  zip = zipfile.ZipFile(matched)
106
107  try:
108    zip_info = zip.getinfo(in_zip_entry)
109  except KeyError:
110    raise ValueError("%s is not an item in the zip file %s" %(in_zip_entry, matched))
111
112  # TODO: do we also need to add header size to this?
113  in_offset = zip_info.header_offset
114
115  # TODO: if a range is specified, use that instead.
116  in_length = zip_info.compress_size
117
118  return InputRecord(in_filepath, in_offset, in_length, 'zip entry (%s)' %(in_zip_entry))
119
120def parse_input_file(input: Iterable[str], zip_paths: List[str]) -> Iterable[InputRecord]:
121  for line in input:
122    line = line.strip()
123
124    _debug_print("Line = ", line)
125    if not line:
126      _debug_print("  skip empty line", line)
127      continue
128    elif line[0] == "#":
129      _debug_print("  skip commented line", line)
130      continue
131
132    res = re.match("([^\s]+)\s+(\d+)\s+(\d+)", line)
133    if not res:
134      raise BadInputError("Expected input of form: <str:filepath> <int:offset> <int:length>")
135
136    in_filepath = res[1]
137    in_offset = int(res[2])
138    in_length = int(res[3])
139
140    yield handle_zip_entry(InputRecord(in_filepath, in_offset, in_length, 'regular file'), zip_paths)
141
142# format:
143#   (<big_endian(i32):file_offset> <big_endian(i32):range_length>)+
144PIN_META_FORMAT = ">ii"
145PIN_META_READ_SIZE = struct.calcsize(PIN_META_FORMAT)
146
147def parse_pin_meta(pin_meta_file, pinlist_meta_parent, zip_paths):
148  if not pin_meta_file:
149    return ()
150
151  global PIN_META_FORMAT
152  global PIN_META_READ_SIZE
153
154  # '/data/app/com.google.android.GoogleCamera-aNQhzSznf4h_bvJ_MRbweQ==/base.apk'
155  #  -> 'com.google.android.GoogleCamera'
156  package_name_match = re.match('/.*/(.*)-.*=/base.apk', pinlist_meta_parent)
157
158  if not package_name_match:
159    raise ValueError("%s did not contain the <packagename>.apk" %(pinlist_meta_parent))
160
161  package_name = package_name_match[1]
162  # "com.google.android.GoogleCamera" -> "GoogleCamera.apk"
163  apk_name = package_name.split(".")[-1] + ".apk"
164
165  path_to_zip_on_host = find_zip_in_paths(apk_name, zip_paths)
166  apk_file_size = os.path.getsize(path_to_zip_on_host)
167  _debug_print("APK path '%s' file size '%d'" %(path_to_zip_on_host, apk_file_size))
168
169  while True:
170    data = pin_meta_file.read(PIN_META_READ_SIZE)
171
172    if not data:
173      break
174
175    (pin_offset, pin_length) = struct.unpack(PIN_META_FORMAT, data)  # (offset, length)
176
177    remark = 'regular file (pinlist.meta)'
178
179    remaining_size = apk_file_size - pin_offset
180    if remaining_size < 0:
181      print("WARNING: Clamp entry (%d, %d), offset too large (max file size = %d)" %(pin_offset, pin_length, apk_file_size))
182
183      pin_length = pin_length + remaining_size
184      pin_offset = pin_offset + remaining_size
185
186      if pin_offset < 0:
187        pin_offset = 0
188
189      remark += '[clamped.offset]'
190
191    pin_last_offset = pin_offset + pin_length
192    remaining_size = apk_file_size - pin_last_offset
193
194    if remaining_size < 0:
195      print("WARNING: Clamp entry (%d, %d), length too large (max file size = %d)" %(pin_offset, pin_length, apk_file_size))
196      pin_length = pin_length + remaining_size
197
198      remark += '[clamped.length]'
199
200    yield InputRecord(pinlist_meta_parent, pin_offset, pin_length, remark)
201
202def write_text_file_output(input_records: Iterable[InputRecord], output_text_file):
203  for rec in input_records:
204    output_text_file.write("%s %d %d #%s\n" %(rec.filepath, rec.offset, rec.length, rec.remark))
205
206def build_trace_file(input_records: Iterable[InputRecord]) -> TraceFile:
207  trace_file = TraceFile()
208  trace_file_index = trace_file.index
209
210  file_id_counter = 0
211  file_id_map = {} # filename -> id
212
213  stats_length_total = 0
214  filename_stats = {} # filename -> total size
215
216  for rec in input_records:
217    filename = rec.filepath
218
219    file_id = file_id_map.get(filename)
220    if not file_id:
221      file_id = file_id_counter
222      file_id_map[filename] = file_id_counter
223      file_id_counter = file_id_counter + 1
224
225      file_index_entry = trace_file_index.entries.add()
226      file_index_entry.id = file_id
227      file_index_entry.file_name = filename
228
229    # already in the file index, add the file entry.
230    file_entry = trace_file.list.entries.add()
231    file_entry.index_id = file_id
232    file_entry.file_length = rec.length
233    stats_length_total += file_entry.file_length
234    file_entry.file_offset = rec.offset
235
236    filename_stats[filename] = filename_stats.get(filename, 0) + file_entry.file_length
237
238  return trace_file
239
240def main():
241  global _debug
242
243  options= parse_options()
244  _debug = options.debug
245  _debug_print("parsed options: ", options)
246
247  if not options.input:
248    input_file = sys.stdin
249    _debug_print("input = stdin")
250  else:
251    input_file = open(options.input)
252    _debug_print("input = (file)", options.input)
253
254  if not options.output_proto_binary:
255    output_proto_file = None
256  else:
257    output_proto_file = open(options.output_proto_binary, 'wb')
258  _debug_print("output_proto_binary = ", output_proto_file)
259
260  pinlist_meta_parent = options.pinlist_meta_parent
261  if options.pinlist_meta:
262    pin_meta_file = open(options.pinlist_meta, 'rb')
263  else:
264    pin_meta_file = None
265
266  if (pinlist_meta_parent == None) != (pin_meta_file == None):
267    print("Options must be used together: --pinlist-meta and --pinlist-meta-path")
268    return 1
269
270  if not options.output_text:
271    output_text_file = sys.stdout
272    _debug_print("output = stdout")
273  else:
274    output_text_file = open(options.output_text, 'w')
275    _debug_print("output = (file)", options.output_text)
276
277  zip_paths = options.zip_path or []
278
279  input_records = list(parse_pin_meta(pin_meta_file, pinlist_meta_parent, zip_paths))
280  input_records = input_records + list(parse_input_file(input_file, zip_paths))
281
282  for p in input_records:
283    _debug_print(p)
284
285  write_text_file_output(input_records, output_text_file)
286  output_text_file.close()
287
288  out_proto = build_trace_file(input_records)
289
290  if output_proto_file:
291    output_proto_file.write(out_proto.SerializeToString())
292    output_proto_file.close()
293
294  return 0
295
296if __name__ == '__main__':
297  sys.exit(main())
298