1#!/usr/bin/env python3 2# 3# Copyright 2019, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import argparse 18import asyncio 19import csv 20import itertools 21import os 22import re 23import struct 24import sys 25import tempfile 26import time 27import zipfile 28from typing import Any, Callable, Dict, Generic, Iterable, List, NamedTuple, TextIO, Tuple, TypeVar, Optional, Union 29 30# Include generated protos. 31dir_name = os.path.dirname(os.path.realpath(__file__)) 32sys.path.append(dir_name + "/generated") 33 34from TraceFile_pb2 import * 35 36 37def parse_options(argv: List[str] = None): 38 """Parse command line arguments and return an argparse Namespace object.""" 39 parser = argparse.ArgumentParser(description="Compile a TraceFile.proto from a manual text file.") 40 # argparse considers args starting with - and -- optional in --help, even though required=True. 41 # by using a named argument group --help will clearly say that it's required instead of optional. 42 required_named = parser.add_argument_group('required named arguments') 43 44 # optional arguments 45 # use a group here to get the required arguments to appear 'above' the optional arguments in help. 46 optional_named = parser.add_argument_group('optional named arguments') 47 optional_named.add_argument('-opb', '--output-proto-binary', dest='output_proto_binary', action='store', help='Write binary proto output to file.') 48 optional_named.add_argument('-pm', '--pinlist-meta', dest='pinlist_meta', action='store', help='Path to pinlist.meta (default=none) binary file.') 49 optional_named.add_argument('-pmp', '--pinlist-meta-parent', dest='pinlist_meta_parent', action='store', help='Device path that the pinlist.meta applies to (e.g. /data/.../somefile.apk)') 50 optional_named.add_argument('-i', '--input', dest='input', action='store', help='Input text file (default stdin).') 51 optional_named.add_argument('-zp', '--zip_path', dest='zip_path', action='append', help='Directory containing zip files.') 52 optional_named.add_argument('-d', '--debug', dest='debug', action='store_true', help='Add extra debugging output') 53 optional_named.add_argument('-ot', '--output-text', dest='output_text', action='store', help='Output text file (default stdout).') 54 55 return parser.parse_args(argv) 56 57# TODO: refactor this with a common library file with analyze_metrics.py 58def _debug_print(*args, **kwargs): 59 """Print the args to sys.stderr if the --debug/-d flag was passed in.""" 60 if _debug: 61 print(*args, **kwargs, file=sys.stderr) 62 63class BadInputError(Exception): 64 pass 65 66InputRecord = NamedTuple('InputRecord', [('filepath', str), ('offset', int), ('length', int), ('remark', str)]) 67 68def find_zip_in_paths(original_name, zip_paths): 69 # /foo/bar/bax.zip -> bax.zip 70 file_basename = os.path.split(original_name)[1] 71 72 # the file must be located in one of the --zip-path arguments 73 matched = None 74 for zip_path in zip_paths: 75 for dir_entry in os.listdir(zip_path): 76 if dir_entry == file_basename: 77 matched = os.path.join(zip_path, dir_entry) 78 break 79 if matched: 80 break 81 82 if not matched: 83 raise ValueError("%s could not be found in any of the --zip_path specified." %(file_basename)) 84 85 _debug_print("found zip file ", file_basename, " in ", matched) 86 87 if not zipfile.is_zipfile(matched): 88 raise ValueError("%s is not a zip file" %(matched)) 89 90 return matched 91 92def handle_zip_entry(input_record, zip_paths): 93 94 res = re.match("([^!]+)[!](.*)", input_record.filepath) 95 96 if not res: 97 return input_record 98 99 # 'foo!bar' 100 in_filepath = res[1] # -> 'foo' 101 in_zip_entry = res[2] # -> 'bar' 102 103 matched = find_zip_in_paths(in_filepath, zip_paths) 104 105 zip = zipfile.ZipFile(matched) 106 107 try: 108 zip_info = zip.getinfo(in_zip_entry) 109 except KeyError: 110 raise ValueError("%s is not an item in the zip file %s" %(in_zip_entry, matched)) 111 112 # TODO: do we also need to add header size to this? 113 in_offset = zip_info.header_offset 114 115 # TODO: if a range is specified, use that instead. 116 in_length = zip_info.compress_size 117 118 return InputRecord(in_filepath, in_offset, in_length, 'zip entry (%s)' %(in_zip_entry)) 119 120def parse_input_file(input: Iterable[str], zip_paths: List[str]) -> Iterable[InputRecord]: 121 for line in input: 122 line = line.strip() 123 124 _debug_print("Line = ", line) 125 if not line: 126 _debug_print(" skip empty line", line) 127 continue 128 elif line[0] == "#": 129 _debug_print(" skip commented line", line) 130 continue 131 132 res = re.match("([^\s]+)\s+(\d+)\s+(\d+)", line) 133 if not res: 134 raise BadInputError("Expected input of form: <str:filepath> <int:offset> <int:length>") 135 136 in_filepath = res[1] 137 in_offset = int(res[2]) 138 in_length = int(res[3]) 139 140 yield handle_zip_entry(InputRecord(in_filepath, in_offset, in_length, 'regular file'), zip_paths) 141 142# format: 143# (<big_endian(i32):file_offset> <big_endian(i32):range_length>)+ 144PIN_META_FORMAT = ">ii" 145PIN_META_READ_SIZE = struct.calcsize(PIN_META_FORMAT) 146 147def parse_pin_meta(pin_meta_file, pinlist_meta_parent, zip_paths): 148 if not pin_meta_file: 149 return () 150 151 global PIN_META_FORMAT 152 global PIN_META_READ_SIZE 153 154 # '/data/app/com.google.android.GoogleCamera-aNQhzSznf4h_bvJ_MRbweQ==/base.apk' 155 # -> 'com.google.android.GoogleCamera' 156 package_name_match = re.match('/.*/(.*)-.*=/base.apk', pinlist_meta_parent) 157 158 if not package_name_match: 159 raise ValueError("%s did not contain the <packagename>.apk" %(pinlist_meta_parent)) 160 161 package_name = package_name_match[1] 162 # "com.google.android.GoogleCamera" -> "GoogleCamera.apk" 163 apk_name = package_name.split(".")[-1] + ".apk" 164 165 path_to_zip_on_host = find_zip_in_paths(apk_name, zip_paths) 166 apk_file_size = os.path.getsize(path_to_zip_on_host) 167 _debug_print("APK path '%s' file size '%d'" %(path_to_zip_on_host, apk_file_size)) 168 169 while True: 170 data = pin_meta_file.read(PIN_META_READ_SIZE) 171 172 if not data: 173 break 174 175 (pin_offset, pin_length) = struct.unpack(PIN_META_FORMAT, data) # (offset, length) 176 177 remark = 'regular file (pinlist.meta)' 178 179 remaining_size = apk_file_size - pin_offset 180 if remaining_size < 0: 181 print("WARNING: Clamp entry (%d, %d), offset too large (max file size = %d)" %(pin_offset, pin_length, apk_file_size)) 182 183 pin_length = pin_length + remaining_size 184 pin_offset = pin_offset + remaining_size 185 186 if pin_offset < 0: 187 pin_offset = 0 188 189 remark += '[clamped.offset]' 190 191 pin_last_offset = pin_offset + pin_length 192 remaining_size = apk_file_size - pin_last_offset 193 194 if remaining_size < 0: 195 print("WARNING: Clamp entry (%d, %d), length too large (max file size = %d)" %(pin_offset, pin_length, apk_file_size)) 196 pin_length = pin_length + remaining_size 197 198 remark += '[clamped.length]' 199 200 yield InputRecord(pinlist_meta_parent, pin_offset, pin_length, remark) 201 202def write_text_file_output(input_records: Iterable[InputRecord], output_text_file): 203 for rec in input_records: 204 output_text_file.write("%s %d %d #%s\n" %(rec.filepath, rec.offset, rec.length, rec.remark)) 205 206def build_trace_file(input_records: Iterable[InputRecord]) -> TraceFile: 207 trace_file = TraceFile() 208 trace_file_index = trace_file.index 209 210 file_id_counter = 0 211 file_id_map = {} # filename -> id 212 213 stats_length_total = 0 214 filename_stats = {} # filename -> total size 215 216 for rec in input_records: 217 filename = rec.filepath 218 219 file_id = file_id_map.get(filename) 220 if not file_id: 221 file_id = file_id_counter 222 file_id_map[filename] = file_id_counter 223 file_id_counter = file_id_counter + 1 224 225 file_index_entry = trace_file_index.entries.add() 226 file_index_entry.id = file_id 227 file_index_entry.file_name = filename 228 229 # already in the file index, add the file entry. 230 file_entry = trace_file.list.entries.add() 231 file_entry.index_id = file_id 232 file_entry.file_length = rec.length 233 stats_length_total += file_entry.file_length 234 file_entry.file_offset = rec.offset 235 236 filename_stats[filename] = filename_stats.get(filename, 0) + file_entry.file_length 237 238 return trace_file 239 240def main(): 241 global _debug 242 243 options= parse_options() 244 _debug = options.debug 245 _debug_print("parsed options: ", options) 246 247 if not options.input: 248 input_file = sys.stdin 249 _debug_print("input = stdin") 250 else: 251 input_file = open(options.input) 252 _debug_print("input = (file)", options.input) 253 254 if not options.output_proto_binary: 255 output_proto_file = None 256 else: 257 output_proto_file = open(options.output_proto_binary, 'wb') 258 _debug_print("output_proto_binary = ", output_proto_file) 259 260 pinlist_meta_parent = options.pinlist_meta_parent 261 if options.pinlist_meta: 262 pin_meta_file = open(options.pinlist_meta, 'rb') 263 else: 264 pin_meta_file = None 265 266 if (pinlist_meta_parent == None) != (pin_meta_file == None): 267 print("Options must be used together: --pinlist-meta and --pinlist-meta-path") 268 return 1 269 270 if not options.output_text: 271 output_text_file = sys.stdout 272 _debug_print("output = stdout") 273 else: 274 output_text_file = open(options.output_text, 'w') 275 _debug_print("output = (file)", options.output_text) 276 277 zip_paths = options.zip_path or [] 278 279 input_records = list(parse_pin_meta(pin_meta_file, pinlist_meta_parent, zip_paths)) 280 input_records = input_records + list(parse_input_file(input_file, zip_paths)) 281 282 for p in input_records: 283 _debug_print(p) 284 285 write_text_file_output(input_records, output_text_file) 286 output_text_file.close() 287 288 out_proto = build_trace_file(input_records) 289 290 if output_proto_file: 291 output_proto_file.write(out_proto.SerializeToString()) 292 output_proto_file.close() 293 294 return 0 295 296if __name__ == '__main__': 297 sys.exit(main()) 298