1#!/usr/bin/env python 2# 3# Copyright (C) 2009 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Check the signatures of all APKs in a target_files .zip file. With 19-c, compare the signatures of each package to the ones in a separate 20target_files (usually a previously distributed build for the same 21device) and flag any changes. 22 23Usage: check_target_file_signatures [flags] target_files 24 25 -c (--compare_with) <other_target_files> 26 Look for compatibility problems between the two sets of target 27 files (eg., packages whose keys have changed). 28 29 -l (--local_cert_dirs) <dir,dir,...> 30 Comma-separated list of top-level directories to scan for 31 .x509.pem files. Defaults to "vendor,build". Where cert files 32 can be found that match APK signatures, the filename will be 33 printed as the cert name, otherwise a hash of the cert plus its 34 subject string will be printed instead. 35 36 -t (--text) 37 Dump the certificate information for both packages in comparison 38 mode (this output is normally suppressed). 39 40""" 41 42from __future__ import print_function 43 44import logging 45import os 46import os.path 47import re 48import subprocess 49import sys 50import zipfile 51 52import common 53 54if sys.hexversion < 0x02070000: 55 print("Python 2.7 or newer is required.", file=sys.stderr) 56 sys.exit(1) 57 58 59logger = logging.getLogger(__name__) 60 61# Work around a bug in Python's zipfile module that prevents opening of zipfiles 62# if any entry has an extra field of between 1 and 3 bytes (which is common with 63# zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which 64# contains the bug) with an empty version (since we don't need to decode the 65# extra field anyway). 66# Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and 67# Python 3.5.0 alpha 1. 68 69 70class MyZipInfo(zipfile.ZipInfo): 71 def _decodeExtra(self): 72 pass 73 74 75zipfile.ZipInfo = MyZipInfo 76 77 78OPTIONS = common.OPTIONS 79 80OPTIONS.text = False 81OPTIONS.compare_with = None 82OPTIONS.local_cert_dirs = ("vendor", "build") 83 84PROBLEMS = [] 85PROBLEM_PREFIX = [] 86 87 88def AddProblem(msg): 89 logger.error(msg) 90 PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg) 91 92 93def Push(msg): 94 PROBLEM_PREFIX.append(msg) 95 96 97def Pop(): 98 PROBLEM_PREFIX.pop() 99 100 101def Banner(msg): 102 print("-" * 70) 103 print(" ", msg) 104 print("-" * 70) 105 106 107def GetCertSubject(cert): 108 p = common.Run(["openssl", "x509", "-inform", "DER", "-text"], 109 stdin=subprocess.PIPE, 110 stdout=subprocess.PIPE, 111 universal_newlines=False) 112 out, err = p.communicate(cert) 113 if err and not err.strip(): 114 return "(error reading cert subject)" 115 for line in out.decode().split("\n"): 116 line = line.strip() 117 if line.startswith("Subject:"): 118 return line[8:].strip() 119 return "(unknown cert subject)" 120 121 122class CertDB(object): 123 124 def __init__(self): 125 self.certs = {} 126 127 def Add(self, cert_digest, subject, name=None): 128 if cert_digest in self.certs: 129 if name: 130 self.certs[cert_digest] = self.certs[cert_digest] + "," + name 131 else: 132 if name is None: 133 name = "unknown cert %s (%s)" % (cert_digest[:12], subject) 134 self.certs[cert_digest] = name 135 136 def Get(self, cert_digest): 137 """Return the name for a given cert digest.""" 138 return self.certs.get(cert_digest, None) 139 140 def FindLocalCerts(self): 141 to_load = [] 142 for top in OPTIONS.local_cert_dirs: 143 for dirpath, _, filenames in os.walk(top): 144 certs = [os.path.join(dirpath, i) 145 for i in filenames if i.endswith(".x509.pem")] 146 if certs: 147 to_load.extend(certs) 148 149 for i in to_load: 150 with open(i) as f: 151 cert = common.ParseCertificate(f.read()) 152 name, _ = os.path.splitext(i) 153 name, _ = os.path.splitext(name) 154 155 cert_sha1 = common.sha1(cert).hexdigest() 156 cert_subject = GetCertSubject(cert) 157 self.Add(cert_sha1, cert_subject, name) 158 159 160ALL_CERTS = CertDB() 161 162 163def CertFromPKCS7(data, filename): 164 """Read the cert out of a PKCS#7-format file (which is what is 165 stored in a signed .apk).""" 166 Push(filename + ":") 167 try: 168 p = common.Run(["openssl", "pkcs7", 169 "-inform", "DER", 170 "-outform", "PEM", 171 "-print_certs"], 172 stdin=subprocess.PIPE, 173 stdout=subprocess.PIPE, 174 universal_newlines=False) 175 out, err = p.communicate(data) 176 if err and not err.strip(): 177 AddProblem("error reading cert:\n" + err.decode()) 178 return None 179 180 cert = common.ParseCertificate(out.decode()) 181 if not cert: 182 AddProblem("error parsing cert output") 183 return None 184 return cert 185 finally: 186 Pop() 187 188 189class APK(object): 190 191 def __init__(self, full_filename, filename): 192 self.filename = filename 193 self.cert_digests = frozenset() 194 self.shared_uid = None 195 self.package = None 196 197 Push(filename+":") 198 try: 199 self.RecordCerts(full_filename) 200 self.ReadManifest(full_filename) 201 finally: 202 Pop() 203 204 def ReadCertsDeprecated(self, full_filename): 205 print("reading certs in deprecated way for {}".format(full_filename)) 206 cert_digests = set() 207 with zipfile.ZipFile(full_filename) as apk: 208 for info in apk.infolist(): 209 filename = info.filename 210 if (filename.startswith("META-INF/") and 211 info.filename.endswith((".DSA", ".RSA"))): 212 pkcs7 = apk.read(filename) 213 cert = CertFromPKCS7(pkcs7, filename) 214 if not cert: 215 continue 216 cert_sha1 = common.sha1(cert).hexdigest() 217 cert_subject = GetCertSubject(cert) 218 ALL_CERTS.Add(cert_sha1, cert_subject) 219 cert_digests.add(cert_sha1) 220 if not cert_digests: 221 AddProblem("No signature found") 222 return 223 self.cert_digests = frozenset(cert_digests) 224 225 def RecordCerts(self, full_filename): 226 """Parse and save the signature of an apk file.""" 227 228 # Dump the cert info with apksigner 229 cmd = ["apksigner", "verify", "--print-certs", full_filename] 230 p = common.Run(cmd, stdout=subprocess.PIPE) 231 output, _ = p.communicate() 232 if p.returncode != 0: 233 self.ReadCertsDeprecated(full_filename) 234 return 235 236 # Sample output: 237 # Signer #1 certificate DN: ... 238 # Signer #1 certificate SHA-256 digest: ... 239 # Signer #1 certificate SHA-1 digest: ... 240 # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-256 digest: 56be132b780656fe2444cd34326eb5d7aac91d2096abf0fe673a99270622ec87 241 # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-1 digest: 19da94896ce4078c38ca695701f1dec741ec6d67 242 # ... 243 certs_info = {} 244 certificate_regex = re.compile( 245 r"(Signer (?:#[0-9]+|\(.*\))) (certificate .*):(.*)") 246 for line in output.splitlines(): 247 m = certificate_regex.match(line) 248 if not m: 249 continue 250 signer, key, val = m.group(1), m.group(2), m.group(3) 251 if certs_info.get(signer): 252 certs_info[signer].update({key.strip(): val.strip()}) 253 else: 254 certs_info.update({signer: {key.strip(): val.strip()}}) 255 if not certs_info: 256 AddProblem("Failed to parse cert info") 257 return 258 259 cert_digests = set() 260 for signer, props in certs_info.items(): 261 subject = props.get("certificate DN") 262 digest = props.get("certificate SHA-1 digest") 263 if not subject or not digest: 264 AddProblem("Failed to parse cert subject or digest") 265 return 266 ALL_CERTS.Add(digest, subject) 267 cert_digests.add(digest) 268 self.cert_digests = frozenset(cert_digests) 269 270 def ReadManifest(self, full_filename): 271 p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file", 272 "AndroidManifest.xml"], 273 stdout=subprocess.PIPE) 274 manifest, err = p.communicate() 275 if err: 276 AddProblem("failed to read manifest " + full_filename) 277 return 278 279 self.shared_uid = None 280 self.package = None 281 282 for line in manifest.split("\n"): 283 line = line.strip() 284 m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line) 285 if m: 286 name = m.group(1) 287 if name == "android:sharedUserId": 288 if self.shared_uid is not None: 289 AddProblem("multiple sharedUserId declarations " + full_filename) 290 self.shared_uid = m.group(2) 291 elif name == "package": 292 if self.package is not None: 293 AddProblem("multiple package declarations " + full_filename) 294 self.package = m.group(2) 295 296 if self.package is None: 297 AddProblem("no package declaration " + full_filename) 298 299 300class TargetFiles(object): 301 def __init__(self): 302 self.max_pkg_len = 30 303 self.max_fn_len = 20 304 self.apks = None 305 self.apks_by_basename = None 306 self.certmap = None 307 308 def LoadZipFile(self, filename): 309 # First read the APK certs file to figure out whether there are compressed 310 # APKs in the archive. If we do have compressed APKs in the archive, then we 311 # must decompress them individually before we perform any analysis. 312 313 # This is the list of wildcards of files we extract from |filename|. 314 apk_extensions = ['*.apk', '*.apex'] 315 316 with zipfile.ZipFile(filename, "r") as input_zip: 317 self.certmap, compressed_extension = common.ReadApkCerts(input_zip) 318 if compressed_extension: 319 apk_extensions.append('*.apk' + compressed_extension) 320 321 d = common.UnzipTemp(filename, apk_extensions) 322 self.apks = {} 323 self.apks_by_basename = {} 324 for dirpath, _, filenames in os.walk(d): 325 for fn in filenames: 326 # Decompress compressed APKs before we begin processing them. 327 if compressed_extension and fn.endswith(compressed_extension): 328 # First strip the compressed extension from the file. 329 uncompressed_fn = fn[:-len(compressed_extension)] 330 331 # Decompress the compressed file to the output file. 332 common.Gunzip(os.path.join(dirpath, fn), 333 os.path.join(dirpath, uncompressed_fn)) 334 335 # Finally, delete the compressed file and use the uncompressed file 336 # for further processing. Note that the deletion is not strictly 337 # required, but is done here to ensure that we're not using too much 338 # space in the temporary directory. 339 os.remove(os.path.join(dirpath, fn)) 340 fn = uncompressed_fn 341 342 if fn.endswith(('.apk', '.apex')): 343 fullname = os.path.join(dirpath, fn) 344 displayname = fullname[len(d)+1:] 345 apk = APK(fullname, displayname) 346 self.apks[apk.filename] = apk 347 self.apks_by_basename[os.path.basename(apk.filename)] = apk 348 if apk.package: 349 self.max_pkg_len = max(self.max_pkg_len, len(apk.package)) 350 self.max_fn_len = max(self.max_fn_len, len(apk.filename)) 351 352 def CheckSharedUids(self): 353 """Look for any instances where packages signed with different 354 certs request the same sharedUserId.""" 355 apks_by_uid = {} 356 for apk in self.apks.values(): 357 if apk.shared_uid: 358 apks_by_uid.setdefault(apk.shared_uid, []).append(apk) 359 360 for uid in sorted(apks_by_uid): 361 apks = apks_by_uid[uid] 362 for apk in apks[1:]: 363 if apk.certs != apks[0].certs: 364 break 365 else: 366 # all packages have the same set of certs; this uid is fine. 367 continue 368 369 AddProblem("different cert sets for packages with uid %s" % (uid,)) 370 371 print("uid %s is shared by packages with different cert sets:" % (uid,)) 372 for apk in apks: 373 print("%-*s [%s]" % (self.max_pkg_len, apk.package, apk.filename)) 374 for digest in apk.cert_digests: 375 print(" ", ALL_CERTS.Get(digest)) 376 print() 377 378 def CheckExternalSignatures(self): 379 for apk_filename, certname in self.certmap.items(): 380 if certname == "EXTERNAL": 381 # Apps marked EXTERNAL should be signed with the test key 382 # during development, then manually re-signed after 383 # predexopting. Consider it an error if this app is now 384 # signed with any key that is present in our tree. 385 apk = self.apks_by_basename[apk_filename] 386 signed_with_external = False 387 for digest in apk.cert_digests: 388 name = ALL_CERTS.Get(digest) 389 if name and name.startswith("unknown "): 390 signed_with_external = True 391 392 if not signed_with_external: 393 Push(apk.filename) 394 AddProblem("hasn't been signed with EXTERNAL cert") 395 Pop() 396 397 def PrintCerts(self): 398 """Display a table of packages grouped by cert.""" 399 by_digest = {} 400 for apk in self.apks.values(): 401 for digest in apk.cert_digests: 402 if apk.package: 403 by_digest.setdefault(digest, []).append((apk.package, apk)) 404 405 order = [(-len(v), k) for (k, v) in by_digest.items()] 406 order.sort() 407 408 for _, digest in order: 409 print("%s:" % (ALL_CERTS.Get(digest),)) 410 apks = by_digest[digest] 411 apks.sort(key=lambda x: x[0]) 412 for i in range(1, len(apks)): 413 pkgname, apk = apks[i] 414 if pkgname == apks[i-1][0]: 415 print("Both {} and {} have same package name {}".format( 416 apk.filename, apks[i-1][1].filename, pkgname)) 417 for _, apk in apks: 418 if apk.shared_uid: 419 print(" %-*s %-*s [%s]" % (self.max_fn_len, apk.filename, 420 self.max_pkg_len, apk.package, 421 apk.shared_uid)) 422 else: 423 print(" %-*s %s" % (self.max_fn_len, apk.filename, apk.package)) 424 print() 425 426 def CompareWith(self, other): 427 """Look for instances where a given package that exists in both 428 self and other have different certs.""" 429 430 all_apks = set(self.apks.keys()) 431 all_apks.update(other.apks.keys()) 432 433 max_pkg_len = max(self.max_pkg_len, other.max_pkg_len) 434 435 by_digestpair = {} 436 437 for i in all_apks: 438 if i in self.apks: 439 if i in other.apks: 440 # in both; should have same set of certs 441 if self.apks[i].cert_digests != other.apks[i].cert_digests: 442 by_digestpair.setdefault((other.apks[i].cert_digests, 443 self.apks[i].cert_digests), []).append(i) 444 else: 445 print("%s [%s]: new APK (not in comparison target_files)" % ( 446 i, self.apks[i].filename)) 447 else: 448 if i in other.apks: 449 print("%s [%s]: removed APK (only in comparison target_files)" % ( 450 i, other.apks[i].filename)) 451 452 if by_digestpair: 453 AddProblem("some APKs changed certs") 454 Banner("APK signing differences") 455 for (old, new), packages in sorted(by_digestpair.items()): 456 for i, o in enumerate(old): 457 if i == 0: 458 print("was", ALL_CERTS.Get(o)) 459 else: 460 print(" ", ALL_CERTS.Get(o)) 461 for i, n in enumerate(new): 462 if i == 0: 463 print("now", ALL_CERTS.Get(n)) 464 else: 465 print(" ", ALL_CERTS.Get(n)) 466 for i in sorted(packages): 467 old_fn = other.apks[i].filename 468 new_fn = self.apks[i].filename 469 if old_fn == new_fn: 470 print(" %-*s [%s]" % (max_pkg_len, i, old_fn)) 471 else: 472 print(" %-*s [was: %s; now: %s]" % (max_pkg_len, i, 473 old_fn, new_fn)) 474 print() 475 476 477def main(argv): 478 def option_handler(o, a): 479 if o in ("-c", "--compare_with"): 480 OPTIONS.compare_with = a 481 elif o in ("-l", "--local_cert_dirs"): 482 OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")] 483 elif o in ("-t", "--text"): 484 OPTIONS.text = True 485 else: 486 return False 487 return True 488 489 args = common.ParseOptions(argv, __doc__, 490 extra_opts="c:l:t", 491 extra_long_opts=["compare_with=", 492 "local_cert_dirs="], 493 extra_option_handler=option_handler) 494 495 if len(args) != 1: 496 common.Usage(__doc__) 497 sys.exit(1) 498 499 common.InitLogging() 500 501 ALL_CERTS.FindLocalCerts() 502 503 Push("input target_files:") 504 try: 505 target_files = TargetFiles() 506 target_files.LoadZipFile(args[0]) 507 finally: 508 Pop() 509 510 compare_files = None 511 if OPTIONS.compare_with: 512 Push("comparison target_files:") 513 try: 514 compare_files = TargetFiles() 515 compare_files.LoadZipFile(OPTIONS.compare_with) 516 finally: 517 Pop() 518 519 if OPTIONS.text or not compare_files: 520 Banner("target files") 521 target_files.PrintCerts() 522 target_files.CheckSharedUids() 523 target_files.CheckExternalSignatures() 524 if compare_files: 525 if OPTIONS.text: 526 Banner("comparison files") 527 compare_files.PrintCerts() 528 target_files.CompareWith(compare_files) 529 530 if PROBLEMS: 531 print("%d problem(s) found:\n" % (len(PROBLEMS),)) 532 for p in PROBLEMS: 533 print(p) 534 return 1 535 536 return 0 537 538 539if __name__ == '__main__': 540 try: 541 r = main(sys.argv[1:]) 542 sys.exit(r) 543 finally: 544 common.Cleanup() 545