• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2009 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Check the signatures of all APKs in a target_files .zip file.  With
19-c, compare the signatures of each package to the ones in a separate
20target_files (usually a previously distributed build for the same
21device) and flag any changes.
22
23Usage:  check_target_file_signatures [flags] target_files
24
25  -c  (--compare_with)  <other_target_files>
26      Look for compatibility problems between the two sets of target
27      files (eg., packages whose keys have changed).
28
29  -l  (--local_cert_dirs)  <dir,dir,...>
30      Comma-separated list of top-level directories to scan for
31      .x509.pem files.  Defaults to "vendor,build".  Where cert files
32      can be found that match APK signatures, the filename will be
33      printed as the cert name, otherwise a hash of the cert plus its
34      subject string will be printed instead.
35
36  -t  (--text)
37      Dump the certificate information for both packages in comparison
38      mode (this output is normally suppressed).
39
40"""
41
42from __future__ import print_function
43
44import logging
45import os
46import os.path
47import re
48import subprocess
49import sys
50import zipfile
51
52import common
53
54if sys.hexversion < 0x02070000:
55  print("Python 2.7 or newer is required.", file=sys.stderr)
56  sys.exit(1)
57
58
59logger = logging.getLogger(__name__)
60
61# Work around a bug in Python's zipfile module that prevents opening of zipfiles
62# if any entry has an extra field of between 1 and 3 bytes (which is common with
63# zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which
64# contains the bug) with an empty version (since we don't need to decode the
65# extra field anyway).
66# Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and
67# Python 3.5.0 alpha 1.
68
69
70class MyZipInfo(zipfile.ZipInfo):
71  def _decodeExtra(self):
72    pass
73
74
75zipfile.ZipInfo = MyZipInfo
76
77
78OPTIONS = common.OPTIONS
79
80OPTIONS.text = False
81OPTIONS.compare_with = None
82OPTIONS.local_cert_dirs = ("vendor", "build")
83
84PROBLEMS = []
85PROBLEM_PREFIX = []
86
87
88def AddProblem(msg):
89  logger.error(msg)
90  PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
91
92
93def Push(msg):
94  PROBLEM_PREFIX.append(msg)
95
96
97def Pop():
98  PROBLEM_PREFIX.pop()
99
100
101def Banner(msg):
102  print("-" * 70)
103  print("  ", msg)
104  print("-" * 70)
105
106
107def GetCertSubject(cert):
108  p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
109                 stdin=subprocess.PIPE,
110                 stdout=subprocess.PIPE,
111                 universal_newlines=False)
112  out, err = p.communicate(cert)
113  if err and not err.strip():
114    return "(error reading cert subject)"
115  for line in out.decode().split("\n"):
116    line = line.strip()
117    if line.startswith("Subject:"):
118      return line[8:].strip()
119  return "(unknown cert subject)"
120
121
122class CertDB(object):
123
124  def __init__(self):
125    self.certs = {}
126
127  def Add(self, cert_digest, subject, name=None):
128    if cert_digest in self.certs:
129      if name:
130        self.certs[cert_digest] = self.certs[cert_digest] + "," + name
131    else:
132      if name is None:
133        name = "unknown cert %s (%s)" % (cert_digest[:12], subject)
134      self.certs[cert_digest] = name
135
136  def Get(self, cert_digest):
137    """Return the name for a given cert digest."""
138    return self.certs.get(cert_digest, None)
139
140  def FindLocalCerts(self):
141    to_load = []
142    for top in OPTIONS.local_cert_dirs:
143      for dirpath, _, filenames in os.walk(top):
144        certs = [os.path.join(dirpath, i)
145                 for i in filenames if i.endswith(".x509.pem")]
146        if certs:
147          to_load.extend(certs)
148
149    for i in to_load:
150      with open(i) as f:
151        cert = common.ParseCertificate(f.read())
152      name, _ = os.path.splitext(i)
153      name, _ = os.path.splitext(name)
154
155      cert_sha1 = common.sha1(cert).hexdigest()
156      cert_subject = GetCertSubject(cert)
157      self.Add(cert_sha1, cert_subject, name)
158
159
160ALL_CERTS = CertDB()
161
162
163def CertFromPKCS7(data, filename):
164  """Read the cert out of a PKCS#7-format file (which is what is
165  stored in a signed .apk)."""
166  Push(filename + ":")
167  try:
168    p = common.Run(["openssl", "pkcs7",
169                    "-inform", "DER",
170                    "-outform", "PEM",
171                    "-print_certs"],
172                   stdin=subprocess.PIPE,
173                   stdout=subprocess.PIPE,
174                   universal_newlines=False)
175    out, err = p.communicate(data)
176    if err and not err.strip():
177      AddProblem("error reading cert:\n" + err.decode())
178      return None
179
180    cert = common.ParseCertificate(out.decode())
181    if not cert:
182      AddProblem("error parsing cert output")
183      return None
184    return cert
185  finally:
186    Pop()
187
188
189class APK(object):
190
191  def __init__(self, full_filename, filename):
192    self.filename = filename
193    self.cert_digests = frozenset()
194    self.shared_uid = None
195    self.package = None
196
197    Push(filename+":")
198    try:
199      self.RecordCerts(full_filename)
200      self.ReadManifest(full_filename)
201    finally:
202      Pop()
203
204  def ReadCertsDeprecated(self, full_filename):
205    print("reading certs in deprecated way for {}".format(full_filename))
206    cert_digests = set()
207    with zipfile.ZipFile(full_filename) as apk:
208      for info in apk.infolist():
209        filename = info.filename
210        if (filename.startswith("META-INF/") and
211                info.filename.endswith((".DSA", ".RSA"))):
212          pkcs7 = apk.read(filename)
213          cert = CertFromPKCS7(pkcs7, filename)
214          if not cert:
215            continue
216          cert_sha1 = common.sha1(cert).hexdigest()
217          cert_subject = GetCertSubject(cert)
218          ALL_CERTS.Add(cert_sha1, cert_subject)
219          cert_digests.add(cert_sha1)
220    if not cert_digests:
221      AddProblem("No signature found")
222      return
223    self.cert_digests = frozenset(cert_digests)
224
225  def RecordCerts(self, full_filename):
226    """Parse and save the signature of an apk file."""
227
228    # Dump the cert info with apksigner
229    cmd = ["apksigner", "verify", "--print-certs", full_filename]
230    p = common.Run(cmd, stdout=subprocess.PIPE)
231    output, _ = p.communicate()
232    if p.returncode != 0:
233      self.ReadCertsDeprecated(full_filename)
234      return
235
236    # Sample output:
237    # Signer #1 certificate DN: ...
238    # Signer #1 certificate SHA-256 digest: ...
239    # Signer #1 certificate SHA-1 digest: ...
240    # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-256 digest: 56be132b780656fe2444cd34326eb5d7aac91d2096abf0fe673a99270622ec87
241    # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-1 digest: 19da94896ce4078c38ca695701f1dec741ec6d67
242    # ...
243    certs_info = {}
244    certificate_regex = re.compile(
245        r"(Signer (?:#[0-9]+|\(.*\))) (certificate .*):(.*)")
246    for line in output.splitlines():
247      m = certificate_regex.match(line)
248      if not m:
249        continue
250      signer, key, val = m.group(1), m.group(2), m.group(3)
251      if certs_info.get(signer):
252        certs_info[signer].update({key.strip(): val.strip()})
253      else:
254        certs_info.update({signer: {key.strip(): val.strip()}})
255    if not certs_info:
256      AddProblem("Failed to parse cert info")
257      return
258
259    cert_digests = set()
260    for signer, props in certs_info.items():
261      subject = props.get("certificate DN")
262      digest = props.get("certificate SHA-1 digest")
263      if not subject or not digest:
264        AddProblem("Failed to parse cert subject or digest")
265        return
266      ALL_CERTS.Add(digest, subject)
267      cert_digests.add(digest)
268    self.cert_digests = frozenset(cert_digests)
269
270  def ReadManifest(self, full_filename):
271    p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file",
272                    "AndroidManifest.xml"],
273                   stdout=subprocess.PIPE)
274    manifest, err = p.communicate()
275    if err:
276      AddProblem("failed to read manifest " + full_filename)
277      return
278
279    self.shared_uid = None
280    self.package = None
281
282    for line in manifest.split("\n"):
283      line = line.strip()
284      m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
285      if m:
286        name = m.group(1)
287        if name == "android:sharedUserId":
288          if self.shared_uid is not None:
289            AddProblem("multiple sharedUserId declarations " + full_filename)
290          self.shared_uid = m.group(2)
291        elif name == "package":
292          if self.package is not None:
293            AddProblem("multiple package declarations " + full_filename)
294          self.package = m.group(2)
295
296    if self.package is None:
297      AddProblem("no package declaration " + full_filename)
298
299
300class TargetFiles(object):
301  def __init__(self):
302    self.max_pkg_len = 30
303    self.max_fn_len = 20
304    self.apks = None
305    self.apks_by_basename = None
306    self.certmap = None
307
308  def LoadZipFile(self, filename):
309    # First read the APK certs file to figure out whether there are compressed
310    # APKs in the archive. If we do have compressed APKs in the archive, then we
311    # must decompress them individually before we perform any analysis.
312
313    # This is the list of wildcards of files we extract from |filename|.
314    apk_extensions = ['*.apk', '*.apex']
315
316    with zipfile.ZipFile(filename, "r") as input_zip:
317      self.certmap, compressed_extension = common.ReadApkCerts(input_zip)
318    if compressed_extension:
319      apk_extensions.append('*.apk' + compressed_extension)
320
321    d = common.UnzipTemp(filename, apk_extensions)
322    self.apks = {}
323    self.apks_by_basename = {}
324    for dirpath, _, filenames in os.walk(d):
325      for fn in filenames:
326        # Decompress compressed APKs before we begin processing them.
327        if compressed_extension and fn.endswith(compressed_extension):
328          # First strip the compressed extension from the file.
329          uncompressed_fn = fn[:-len(compressed_extension)]
330
331          # Decompress the compressed file to the output file.
332          common.Gunzip(os.path.join(dirpath, fn),
333                        os.path.join(dirpath, uncompressed_fn))
334
335          # Finally, delete the compressed file and use the uncompressed file
336          # for further processing. Note that the deletion is not strictly
337          # required, but is done here to ensure that we're not using too much
338          # space in the temporary directory.
339          os.remove(os.path.join(dirpath, fn))
340          fn = uncompressed_fn
341
342        if fn.endswith(('.apk', '.apex')):
343          fullname = os.path.join(dirpath, fn)
344          displayname = fullname[len(d)+1:]
345          apk = APK(fullname, displayname)
346          self.apks[apk.filename] = apk
347          self.apks_by_basename[os.path.basename(apk.filename)] = apk
348          if apk.package:
349            self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
350          self.max_fn_len = max(self.max_fn_len, len(apk.filename))
351
352  def CheckSharedUids(self):
353    """Look for any instances where packages signed with different
354    certs request the same sharedUserId."""
355    apks_by_uid = {}
356    for apk in self.apks.values():
357      if apk.shared_uid:
358        apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
359
360    for uid in sorted(apks_by_uid):
361      apks = apks_by_uid[uid]
362      for apk in apks[1:]:
363        if apk.certs != apks[0].certs:
364          break
365      else:
366        # all packages have the same set of certs; this uid is fine.
367        continue
368
369      AddProblem("different cert sets for packages with uid %s" % (uid,))
370
371      print("uid %s is shared by packages with different cert sets:" % (uid,))
372      for apk in apks:
373        print("%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename))
374        for digest in apk.cert_digests:
375          print("   ", ALL_CERTS.Get(digest))
376      print()
377
378  def CheckExternalSignatures(self):
379    for apk_filename, certname in self.certmap.items():
380      if certname == "EXTERNAL":
381        # Apps marked EXTERNAL should be signed with the test key
382        # during development, then manually re-signed after
383        # predexopting.  Consider it an error if this app is now
384        # signed with any key that is present in our tree.
385        apk = self.apks_by_basename[apk_filename]
386        signed_with_external = False
387        for digest in apk.cert_digests:
388          name = ALL_CERTS.Get(digest)
389          if name and name.startswith("unknown "):
390            signed_with_external = True
391
392        if not signed_with_external:
393          Push(apk.filename)
394          AddProblem("hasn't been signed with EXTERNAL cert")
395          Pop()
396
397  def PrintCerts(self):
398    """Display a table of packages grouped by cert."""
399    by_digest = {}
400    for apk in self.apks.values():
401      for digest in apk.cert_digests:
402        if apk.package:
403          by_digest.setdefault(digest, []).append((apk.package, apk))
404
405    order = [(-len(v), k) for (k, v) in by_digest.items()]
406    order.sort()
407
408    for _, digest in order:
409      print("%s:" % (ALL_CERTS.Get(digest),))
410      apks = by_digest[digest]
411      apks.sort(key=lambda x: x[0])
412      for i in range(1, len(apks)):
413        pkgname, apk = apks[i]
414        if pkgname == apks[i-1][0]:
415          print("Both {} and {} have same package name {}".format(
416              apk.filename, apks[i-1][1].filename, pkgname))
417      for _, apk in apks:
418        if apk.shared_uid:
419          print("  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
420                                        self.max_pkg_len, apk.package,
421                                        apk.shared_uid))
422        else:
423          print("  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package))
424      print()
425
426  def CompareWith(self, other):
427    """Look for instances where a given package that exists in both
428    self and other have different certs."""
429
430    all_apks = set(self.apks.keys())
431    all_apks.update(other.apks.keys())
432
433    max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
434
435    by_digestpair = {}
436
437    for i in all_apks:
438      if i in self.apks:
439        if i in other.apks:
440          # in both; should have same set of certs
441          if self.apks[i].cert_digests != other.apks[i].cert_digests:
442            by_digestpair.setdefault((other.apks[i].cert_digests,
443                                      self.apks[i].cert_digests), []).append(i)
444        else:
445          print("%s [%s]: new APK (not in comparison target_files)" % (
446              i, self.apks[i].filename))
447      else:
448        if i in other.apks:
449          print("%s [%s]: removed APK (only in comparison target_files)" % (
450              i, other.apks[i].filename))
451
452    if by_digestpair:
453      AddProblem("some APKs changed certs")
454      Banner("APK signing differences")
455      for (old, new), packages in sorted(by_digestpair.items()):
456        for i, o in enumerate(old):
457          if i == 0:
458            print("was", ALL_CERTS.Get(o))
459          else:
460            print("   ", ALL_CERTS.Get(o))
461        for i, n in enumerate(new):
462          if i == 0:
463            print("now", ALL_CERTS.Get(n))
464          else:
465            print("   ", ALL_CERTS.Get(n))
466        for i in sorted(packages):
467          old_fn = other.apks[i].filename
468          new_fn = self.apks[i].filename
469          if old_fn == new_fn:
470            print("  %-*s  [%s]" % (max_pkg_len, i, old_fn))
471          else:
472            print("  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
473                                                  old_fn, new_fn))
474        print()
475
476
477def main(argv):
478  def option_handler(o, a):
479    if o in ("-c", "--compare_with"):
480      OPTIONS.compare_with = a
481    elif o in ("-l", "--local_cert_dirs"):
482      OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
483    elif o in ("-t", "--text"):
484      OPTIONS.text = True
485    else:
486      return False
487    return True
488
489  args = common.ParseOptions(argv, __doc__,
490                             extra_opts="c:l:t",
491                             extra_long_opts=["compare_with=",
492                                              "local_cert_dirs="],
493                             extra_option_handler=option_handler)
494
495  if len(args) != 1:
496    common.Usage(__doc__)
497    sys.exit(1)
498
499  common.InitLogging()
500
501  ALL_CERTS.FindLocalCerts()
502
503  Push("input target_files:")
504  try:
505    target_files = TargetFiles()
506    target_files.LoadZipFile(args[0])
507  finally:
508    Pop()
509
510  compare_files = None
511  if OPTIONS.compare_with:
512    Push("comparison target_files:")
513    try:
514      compare_files = TargetFiles()
515      compare_files.LoadZipFile(OPTIONS.compare_with)
516    finally:
517      Pop()
518
519  if OPTIONS.text or not compare_files:
520    Banner("target files")
521    target_files.PrintCerts()
522  target_files.CheckSharedUids()
523  target_files.CheckExternalSignatures()
524  if compare_files:
525    if OPTIONS.text:
526      Banner("comparison files")
527      compare_files.PrintCerts()
528    target_files.CompareWith(compare_files)
529
530  if PROBLEMS:
531    print("%d problem(s) found:\n" % (len(PROBLEMS),))
532    for p in PROBLEMS:
533      print(p)
534    return 1
535
536  return 0
537
538
539if __name__ == '__main__':
540  try:
541    r = main(sys.argv[1:])
542    sys.exit(r)
543  finally:
544    common.Cleanup()
545