1#!/usr/bin/env python 2# 3# Copyright (C) 2009 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Check the signatures of all APKs in a target_files .zip file. With 19-c, compare the signatures of each package to the ones in a separate 20target_files (usually a previously distributed build for the same 21device) and flag any changes. 22 23Usage: check_target_file_signatures [flags] target_files 24 25 -c (--compare_with) <other_target_files> 26 Look for compatibility problems between the two sets of target 27 files (eg., packages whose keys have changed). 28 29 -l (--local_cert_dirs) <dir,dir,...> 30 Comma-separated list of top-level directories to scan for 31 .x509.pem files. Defaults to "vendor,build". Where cert files 32 can be found that match APK signatures, the filename will be 33 printed as the cert name, otherwise a hash of the cert plus its 34 subject string will be printed instead. 35 36 -t (--text) 37 Dump the certificate information for both packages in comparison 38 mode (this output is normally suppressed). 39 40""" 41 42from __future__ import print_function 43 44import logging 45import os 46import os.path 47import re 48import subprocess 49import sys 50import zipfile 51 52import common 53 54if sys.hexversion < 0x02070000: 55 print("Python 2.7 or newer is required.", file=sys.stderr) 56 sys.exit(1) 57 58 59logger = logging.getLogger(__name__) 60 61# Work around a bug in Python's zipfile module that prevents opening of zipfiles 62# if any entry has an extra field of between 1 and 3 bytes (which is common with 63# zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which 64# contains the bug) with an empty version (since we don't need to decode the 65# extra field anyway). 66# Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and 67# Python 3.5.0 alpha 1. 68 69 70class MyZipInfo(zipfile.ZipInfo): 71 def _decodeExtra(self): 72 pass 73 74 75zipfile.ZipInfo = MyZipInfo 76 77 78OPTIONS = common.OPTIONS 79 80OPTIONS.text = False 81OPTIONS.compare_with = None 82OPTIONS.local_cert_dirs = ("vendor", "build") 83 84PROBLEMS = [] 85PROBLEM_PREFIX = [] 86 87 88def AddProblem(msg): 89 logger.error(msg) 90 PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg) 91 92 93def Push(msg): 94 PROBLEM_PREFIX.append(msg) 95 96 97def Pop(): 98 PROBLEM_PREFIX.pop() 99 100 101def Banner(msg): 102 print("-" * 70) 103 print(" ", msg) 104 print("-" * 70) 105 106 107def GetCertSubject(cert): 108 p = common.Run(["openssl", "x509", "-inform", "DER", "-text"], 109 stdin=subprocess.PIPE, 110 stdout=subprocess.PIPE, 111 universal_newlines=False) 112 out, err = p.communicate(cert) 113 if err and not err.strip(): 114 return "(error reading cert subject)" 115 for line in out.decode().split("\n"): 116 line = line.strip() 117 if line.startswith("Subject:"): 118 return line[8:].strip() 119 return "(unknown cert subject)" 120 121 122class CertDB(object): 123 124 def __init__(self): 125 self.certs = {} 126 127 def Add(self, cert_digest, subject, name=None): 128 if cert_digest in self.certs: 129 if name: 130 self.certs[cert_digest] = self.certs[cert_digest] + "," + name 131 else: 132 if name is None: 133 name = "unknown cert %s (%s)" % (cert_digest[:12], subject) 134 self.certs[cert_digest] = name 135 136 def Get(self, cert_digest): 137 """Return the name for a given cert digest.""" 138 return self.certs.get(cert_digest, None) 139 140 def FindLocalCerts(self): 141 to_load = [] 142 for top in OPTIONS.local_cert_dirs: 143 for dirpath, _, filenames in os.walk(top): 144 certs = [os.path.join(dirpath, i) 145 for i in filenames if i.endswith(".x509.pem")] 146 if certs: 147 to_load.extend(certs) 148 149 for i in to_load: 150 with open(i) as f: 151 cert = common.ParseCertificate(f.read()) 152 name, _ = os.path.splitext(i) 153 name, _ = os.path.splitext(name) 154 155 cert_sha1 = common.sha1(cert).hexdigest() 156 cert_subject = GetCertSubject(cert) 157 self.Add(cert_sha1, cert_subject, name) 158 159 160ALL_CERTS = CertDB() 161 162 163def CertFromPKCS7(data, filename): 164 """Read the cert out of a PKCS#7-format file (which is what is 165 stored in a signed .apk).""" 166 Push(filename + ":") 167 try: 168 p = common.Run(["openssl", "pkcs7", 169 "-inform", "DER", 170 "-outform", "PEM", 171 "-print_certs"], 172 stdin=subprocess.PIPE, 173 stdout=subprocess.PIPE, 174 universal_newlines=False) 175 out, err = p.communicate(data) 176 if err and not err.strip(): 177 AddProblem("error reading cert:\n" + err.decode()) 178 return None 179 180 cert = common.ParseCertificate(out.decode()) 181 if not cert: 182 AddProblem("error parsing cert output") 183 return None 184 return cert 185 finally: 186 Pop() 187 188 189class APK(object): 190 191 def __init__(self, full_filename, filename): 192 self.filename = filename 193 self.cert_digests = frozenset() 194 self.shared_uid = None 195 self.package = None 196 197 Push(filename+":") 198 try: 199 self.RecordCerts(full_filename) 200 self.ReadManifest(full_filename) 201 finally: 202 Pop() 203 204 def ReadCertsDeprecated(self, full_filename): 205 print("reading certs in deprecated way for {}".format(full_filename)) 206 cert_digests = set() 207 with zipfile.ZipFile(full_filename) as apk: 208 for info in apk.infolist(): 209 filename = info.filename 210 if (filename.startswith("META-INF/") and 211 info.filename.endswith((".DSA", ".RSA"))): 212 pkcs7 = apk.read(filename) 213 cert = CertFromPKCS7(pkcs7, filename) 214 if not cert: 215 continue 216 cert_sha1 = common.sha1(cert).hexdigest() 217 cert_subject = GetCertSubject(cert) 218 ALL_CERTS.Add(cert_sha1, cert_subject) 219 cert_digests.add(cert_sha1) 220 if not cert_digests: 221 AddProblem("No signature found") 222 return 223 self.cert_digests = frozenset(cert_digests) 224 225 def RecordCerts(self, full_filename): 226 """Parse and save the signature of an apk file.""" 227 228 # Dump the cert info with apksigner 229 cmd = ["apksigner", "verify", "--print-certs", full_filename] 230 p = common.Run(cmd, stdout=subprocess.PIPE) 231 output, _ = p.communicate() 232 if p.returncode != 0: 233 self.ReadCertsDeprecated(full_filename) 234 return 235 236 # Sample output: 237 # Signer #1 certificate DN: ... 238 # Signer #1 certificate SHA-256 digest: ... 239 # Signer #1 certificate SHA-1 digest: ... 240 # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-256 digest: 56be132b780656fe2444cd34326eb5d7aac91d2096abf0fe673a99270622ec87 241 # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-1 digest: 19da94896ce4078c38ca695701f1dec741ec6d67 242 # ... 243 certs_info = {} 244 certificate_regex = re.compile(r"(Signer (?:#[0-9]+|\(.*\))) (certificate .*):(.*)") 245 for line in output.splitlines(): 246 m = certificate_regex.match(line) 247 if not m: 248 continue 249 signer, key, val = m.group(1), m.group(2), m.group(3) 250 if certs_info.get(signer): 251 certs_info[signer].update({key.strip(): val.strip()}) 252 else: 253 certs_info.update({signer: {key.strip(): val.strip()}}) 254 if not certs_info: 255 AddProblem("Failed to parse cert info") 256 return 257 258 cert_digests = set() 259 for signer, props in certs_info.items(): 260 subject = props.get("certificate DN") 261 digest = props.get("certificate SHA-1 digest") 262 if not subject or not digest: 263 AddProblem("Failed to parse cert subject or digest") 264 return 265 ALL_CERTS.Add(digest, subject) 266 cert_digests.add(digest) 267 self.cert_digests = frozenset(cert_digests) 268 269 def ReadManifest(self, full_filename): 270 p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file", 271 "AndroidManifest.xml"], 272 stdout=subprocess.PIPE) 273 manifest, err = p.communicate() 274 if err: 275 AddProblem("failed to read manifest " + full_filename) 276 return 277 278 self.shared_uid = None 279 self.package = None 280 281 for line in manifest.split("\n"): 282 line = line.strip() 283 m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line) 284 if m: 285 name = m.group(1) 286 if name == "android:sharedUserId": 287 if self.shared_uid is not None: 288 AddProblem("multiple sharedUserId declarations " + full_filename) 289 self.shared_uid = m.group(2) 290 elif name == "package": 291 if self.package is not None: 292 AddProblem("multiple package declarations " + full_filename) 293 self.package = m.group(2) 294 295 if self.package is None: 296 AddProblem("no package declaration " + full_filename) 297 298 299class TargetFiles(object): 300 def __init__(self): 301 self.max_pkg_len = 30 302 self.max_fn_len = 20 303 self.apks = None 304 self.apks_by_basename = None 305 self.certmap = None 306 307 def LoadZipFile(self, filename): 308 # First read the APK certs file to figure out whether there are compressed 309 # APKs in the archive. If we do have compressed APKs in the archive, then we 310 # must decompress them individually before we perform any analysis. 311 312 # This is the list of wildcards of files we extract from |filename|. 313 apk_extensions = ['*.apk', '*.apex'] 314 315 with zipfile.ZipFile(filename) as input_zip: 316 self.certmap, compressed_extension = common.ReadApkCerts(input_zip) 317 if compressed_extension: 318 apk_extensions.append('*.apk' + compressed_extension) 319 320 d = common.UnzipTemp(filename, apk_extensions) 321 self.apks = {} 322 self.apks_by_basename = {} 323 for dirpath, _, filenames in os.walk(d): 324 for fn in filenames: 325 # Decompress compressed APKs before we begin processing them. 326 if compressed_extension and fn.endswith(compressed_extension): 327 # First strip the compressed extension from the file. 328 uncompressed_fn = fn[:-len(compressed_extension)] 329 330 # Decompress the compressed file to the output file. 331 common.Gunzip(os.path.join(dirpath, fn), 332 os.path.join(dirpath, uncompressed_fn)) 333 334 # Finally, delete the compressed file and use the uncompressed file 335 # for further processing. Note that the deletion is not strictly 336 # required, but is done here to ensure that we're not using too much 337 # space in the temporary directory. 338 os.remove(os.path.join(dirpath, fn)) 339 fn = uncompressed_fn 340 341 if fn.endswith(('.apk', '.apex')): 342 fullname = os.path.join(dirpath, fn) 343 displayname = fullname[len(d)+1:] 344 apk = APK(fullname, displayname) 345 self.apks[apk.filename] = apk 346 self.apks_by_basename[os.path.basename(apk.filename)] = apk 347 if apk.package: 348 self.max_pkg_len = max(self.max_pkg_len, len(apk.package)) 349 self.max_fn_len = max(self.max_fn_len, len(apk.filename)) 350 351 def CheckSharedUids(self): 352 """Look for any instances where packages signed with different 353 certs request the same sharedUserId.""" 354 apks_by_uid = {} 355 for apk in self.apks.values(): 356 if apk.shared_uid: 357 apks_by_uid.setdefault(apk.shared_uid, []).append(apk) 358 359 for uid in sorted(apks_by_uid): 360 apks = apks_by_uid[uid] 361 for apk in apks[1:]: 362 if apk.certs != apks[0].certs: 363 break 364 else: 365 # all packages have the same set of certs; this uid is fine. 366 continue 367 368 AddProblem("different cert sets for packages with uid %s" % (uid,)) 369 370 print("uid %s is shared by packages with different cert sets:" % (uid,)) 371 for apk in apks: 372 print("%-*s [%s]" % (self.max_pkg_len, apk.package, apk.filename)) 373 for digest in apk.cert_digests: 374 print(" ", ALL_CERTS.Get(digest)) 375 print() 376 377 def CheckExternalSignatures(self): 378 for apk_filename, certname in self.certmap.items(): 379 if certname == "EXTERNAL": 380 # Apps marked EXTERNAL should be signed with the test key 381 # during development, then manually re-signed after 382 # predexopting. Consider it an error if this app is now 383 # signed with any key that is present in our tree. 384 apk = self.apks_by_basename[apk_filename] 385 signed_with_external = False 386 for digest in apk.cert_digests: 387 name = ALL_CERTS.Get(digest) 388 if name and name.startswith("unknown "): 389 signed_with_external = True 390 391 if not signed_with_external: 392 Push(apk.filename) 393 AddProblem("hasn't been signed with EXTERNAL cert") 394 Pop() 395 396 def PrintCerts(self): 397 """Display a table of packages grouped by cert.""" 398 by_digest = {} 399 for apk in self.apks.values(): 400 for digest in apk.cert_digests: 401 if apk.package: 402 by_digest.setdefault(digest, []).append((apk.package, apk)) 403 404 order = [(-len(v), k) for (k, v) in by_digest.items()] 405 order.sort() 406 407 for _, digest in order: 408 print("%s:" % (ALL_CERTS.Get(digest),)) 409 apks = by_digest[digest] 410 apks.sort(key=lambda x: x[0]) 411 for i in range(1, len(apks)): 412 pkgname, apk = apks[i] 413 if pkgname == apks[i-1][0]: 414 print("Both {} and {} have same package name {}".format( 415 apk.filename, apks[i-1][1].filename, pkgname)) 416 for _, apk in apks: 417 if apk.shared_uid: 418 print(" %-*s %-*s [%s]" % (self.max_fn_len, apk.filename, 419 self.max_pkg_len, apk.package, 420 apk.shared_uid)) 421 else: 422 print(" %-*s %s" % (self.max_fn_len, apk.filename, apk.package)) 423 print() 424 425 def CompareWith(self, other): 426 """Look for instances where a given package that exists in both 427 self and other have different certs.""" 428 429 all_apks = set(self.apks.keys()) 430 all_apks.update(other.apks.keys()) 431 432 max_pkg_len = max(self.max_pkg_len, other.max_pkg_len) 433 434 by_digestpair = {} 435 436 for i in all_apks: 437 if i in self.apks: 438 if i in other.apks: 439 # in both; should have same set of certs 440 if self.apks[i].cert_digests != other.apks[i].cert_digests: 441 by_digestpair.setdefault((other.apks[i].cert_digests, 442 self.apks[i].cert_digests), []).append(i) 443 else: 444 print("%s [%s]: new APK (not in comparison target_files)" % ( 445 i, self.apks[i].filename)) 446 else: 447 if i in other.apks: 448 print("%s [%s]: removed APK (only in comparison target_files)" % ( 449 i, other.apks[i].filename)) 450 451 if by_digestpair: 452 AddProblem("some APKs changed certs") 453 Banner("APK signing differences") 454 for (old, new), packages in sorted(by_digestpair.items()): 455 for i, o in enumerate(old): 456 if i == 0: 457 print("was", ALL_CERTS.Get(o)) 458 else: 459 print(" ", ALL_CERTS.Get(o)) 460 for i, n in enumerate(new): 461 if i == 0: 462 print("now", ALL_CERTS.Get(n)) 463 else: 464 print(" ", ALL_CERTS.Get(n)) 465 for i in sorted(packages): 466 old_fn = other.apks[i].filename 467 new_fn = self.apks[i].filename 468 if old_fn == new_fn: 469 print(" %-*s [%s]" % (max_pkg_len, i, old_fn)) 470 else: 471 print(" %-*s [was: %s; now: %s]" % (max_pkg_len, i, 472 old_fn, new_fn)) 473 print() 474 475 476def main(argv): 477 def option_handler(o, a): 478 if o in ("-c", "--compare_with"): 479 OPTIONS.compare_with = a 480 elif o in ("-l", "--local_cert_dirs"): 481 OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")] 482 elif o in ("-t", "--text"): 483 OPTIONS.text = True 484 else: 485 return False 486 return True 487 488 args = common.ParseOptions(argv, __doc__, 489 extra_opts="c:l:t", 490 extra_long_opts=["compare_with=", 491 "local_cert_dirs="], 492 extra_option_handler=option_handler) 493 494 if len(args) != 1: 495 common.Usage(__doc__) 496 sys.exit(1) 497 498 common.InitLogging() 499 500 ALL_CERTS.FindLocalCerts() 501 502 Push("input target_files:") 503 try: 504 target_files = TargetFiles() 505 target_files.LoadZipFile(args[0]) 506 finally: 507 Pop() 508 509 compare_files = None 510 if OPTIONS.compare_with: 511 Push("comparison target_files:") 512 try: 513 compare_files = TargetFiles() 514 compare_files.LoadZipFile(OPTIONS.compare_with) 515 finally: 516 Pop() 517 518 if OPTIONS.text or not compare_files: 519 Banner("target files") 520 target_files.PrintCerts() 521 target_files.CheckSharedUids() 522 target_files.CheckExternalSignatures() 523 if compare_files: 524 if OPTIONS.text: 525 Banner("comparison files") 526 compare_files.PrintCerts() 527 target_files.CompareWith(compare_files) 528 529 if PROBLEMS: 530 print("%d problem(s) found:\n" % (len(PROBLEMS),)) 531 for p in PROBLEMS: 532 print(p) 533 return 1 534 535 return 0 536 537 538if __name__ == '__main__': 539 try: 540 r = main(sys.argv[1:]) 541 sys.exit(r) 542 finally: 543 common.Cleanup() 544